Author: ningjiang
Date: Wed Apr 7 01:46:24 2010
New Revision: 931403
URL: http://svn.apache.org/viewvc?rev=931403&view=rev
Log:
CAMEL-2618 Do not use ProducerTemplate internally to send to same destination
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
Modified:
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java?rev=931403&r1=931402&r2=931403&view=diff
==============================================================================
---
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
(original)
+++
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/transport/CamelConduit.java
Wed Apr 7 01:46:24 2010
@@ -22,9 +22,13 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.cxf.CxfConstants;
import org.apache.camel.component.cxf.util.CxfHeaderHelper;
import org.apache.camel.component.cxf.util.CxfMessageHelper;
@@ -53,6 +57,7 @@ public class CamelConduit extends Abstra
private CamelContext camelContext;
private EndpointInfo endpointInfo;
private String targetCamelEndpointUri;
+ private Producer producer;
private ProducerTemplate camelTemplate;
private Bus bus;
private HeaderFilterStrategy headerFilterStrategy;
@@ -80,6 +85,13 @@ public class CamelConduit extends Abstra
bus = b;
initConfig();
this.headerFilterStrategy = headerFilterStrategy;
+ Endpoint target =
getCamelContext().getEndpoint(targetCamelEndpointUri);
+ try {
+ producer = target.createProducer();
+ producer.start();
+ } catch (Exception e) {
+ throw new RuntimeCamelException("Cannot create the producer
rightly", e);
+ }
}
public void setCamelContext(CamelContext context) {
@@ -102,7 +114,12 @@ public class CamelConduit extends Abstra
public void close() {
getLogger().log(Level.FINE, "CamelConduit closed ");
-
+ // shutdown the producer
+ try {
+ producer.stop();
+ } catch (Exception e) {
+ getLogger().log(Level.WARNING, "CamelConduit producer stop with
the exception", e);
+ }
}
protected Logger getLogger() {
@@ -126,6 +143,7 @@ public class CamelConduit extends Abstra
}
}
+ @Deprecated
public ProducerTemplate getCamelTemplate() {
if (camelTemplate == null) {
camelTemplate = getCamelContext().createProducerTemplate();
@@ -133,6 +151,7 @@ public class CamelConduit extends Abstra
return camelTemplate;
}
+ @Deprecated
public void setCamelTemplate(ProducerTemplate template) {
camelTemplate = template;
}
@@ -167,23 +186,28 @@ public class CamelConduit extends Abstra
pattern = ExchangePattern.InOut;
}
getLogger().log(Level.FINE, "send the message to endpoint" +
targetCamelEndpointUri);
- // We could wait for the rely asynchronously
- org.apache.camel.Exchange exchange =
getCamelTemplate().send(targetCamelEndpointUri, pattern, new Processor() {
- public void process(org.apache.camel.Exchange ex) throws
IOException {
- CachedOutputStream outputStream =
(CachedOutputStream)outMessage.getContent(OutputStream.class);
- // Send out the request message here, copy the
protocolHeader back
- CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy,
outMessage, ex.getIn().getHeaders(), ex);
-
- // TODO support different encoding
- ex.getIn().setBody(outputStream.getBytes());
- getLogger().log(Level.FINE, "template sending request: ",
ex.getIn());
- }
- });
- exchange.setProperty(CxfConstants.CXF_EXCHANGE,
outMessage.getExchange());
+ org.apache.camel.Exchange exchange =
producer.createExchange(pattern);
+
+ exchange.setProperty(Exchange.TO_ENDPOINT, targetCamelEndpointUri);
+ CachedOutputStream outputStream =
(CachedOutputStream)outMessage.getContent(OutputStream.class);
+ // Send out the request message here, copy the protocolHeader back
+ CxfHeaderHelper.propagateCxfToCamel(headerFilterStrategy,
outMessage, exchange.getIn().getHeaders(), exchange);
+
+ // TODO support different encoding
+ exchange.getIn().setBody(outputStream.getBytes());
+ getLogger().log(Level.FINE, "template sending request: ",
exchange.getIn());
+ Exception exception = null;
+ try {
+ producer.process(exchange);
+ } catch (Exception ex) {
+ exception = ex;
+ }
// Throw the exception that the template get
- if (exchange.getException() != null) {
+ exception = exchange.getException();
+ if (exception != null) {
throw IOHelper.createIOException("Can't send the request
message.", exchange.getException());
}
+ exchange.setProperty(CxfConstants.CXF_EXCHANGE,
outMessage.getExchange());
if (!isOneWay) {
handleResponse(exchange);
}