Sure, will commit soon.
Thanks
Freeman
Guillaume Nodet wrote:
Freeman, I think this modification should be done on the
ConsumerEndpoint too in servicemix-http...
On Tue, Mar 31, 2009 at 04:13, <[email protected]> wrote:
Author: ffang
Date: Tue Mar 31 02:13:00 2009
New Revision: 760291
URL: http://svn.apache.org/viewvc?rev=760291&view=rev
Log:
[SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
Modified:
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Modified:
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
(original)
+++
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Tue Mar 31 02:13:00 2009
@@ -71,6 +71,7 @@
protected Map<String, MessageExchange> exchanges;
protected int suspentionTime = 60000;
protected boolean started = false;
+ private boolean isSTFlow;
public ConsumerProcessor(HttpEndpoint endpoint) {
super(endpoint);
@@ -96,21 +97,27 @@
if (cont == null) {
throw new Exception("HTTP request has timed out");
}
- synchronized (cont) {
- if (locks.remove(exchange.getExchangeId()) == null) {
- throw new Exception("HTTP request has timed out");
- }
- if (log.isDebugEnabled()) {
- log.debug("Resuming continuation for exchange: " +
exchange.getExchangeId());
- }
- exchanges.put(exchange.getExchangeId(), exchange);
- cont.resume();
- if (!cont.isResumed()) {
+
+ if (!cont.isPending()) {
+ isSTFlow = true;
+ } else {
+ isSTFlow = false;
+ synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception("HTTP request has timed out");
+ }
if (log.isDebugEnabled()) {
- log.debug("Could not resume continuation for exchange: " +
exchange.getExchangeId());
+ log.debug("Resuming continuation for exchange: " +
exchange.getExchangeId());
+ }
+ exchanges.put(exchange.getExchangeId(), exchange);
+ cont.resume();
+ if (!cont.isResumed()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Could not resume continuation for exchange:
" + exchange.getExchangeId());
+ }
+ exchanges.remove(exchange.getExchangeId());
+ throw new Exception("HTTP request has timed out for exchange:
" + exchange.getExchangeId());
}
- exchanges.remove(exchange.getExchangeId());
- throw new Exception("HTTP request has timed out for exchange:
" + exchange.getExchangeId());
}
}
}
@@ -168,15 +175,22 @@
request.setAttribute(MessageExchange.class.getName(),
exchange.getExchangeId());
synchronized (cont) {
channel.send(exchange);
- if (log.isDebugEnabled()) {
- log.debug("Suspending continuation for exchange: " +
exchange.getExchangeId());
- }
- boolean result = cont.suspend(suspentionTime);
- exchange = exchanges.remove(exchange.getExchangeId());
- request.removeAttribute(MessageExchange.class.getName());
- if (!result) {
- locks.remove(exchange.getExchangeId());
- throw new Exception("Exchange timed out");
+ if (!isSTFlow) {
+ if (log.isDebugEnabled()) {
+ log.debug("Suspending continuation for exchange: "
+ exchange.getExchangeId());
+ }
+ boolean result = cont.suspend(suspentionTime);
+ exchange = exchanges.remove(exchange.getExchangeId());
+
request.removeAttribute(MessageExchange.class.getName());
+ if (!result) {
+ locks.remove(exchange.getExchangeId());
+ throw new Exception("Exchange timed out");
+ }
+ } else {
+ String id = (String)
request.getAttribute(MessageExchange.class.getName());
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+
request.removeAttribute(MessageExchange.class.getName());
}
}
} catch (RetryRequest retry) {