Freeman, I think this modification should be done on the ConsumerEndpoint too in servicemix-http...
On Tue, Mar 31, 2009 at 04:13, <[email protected]> wrote: > Author: ffang > Date: Tue Mar 31 02:13:00 2009 > New Revision: 760291 > > URL: http://svn.apache.org/viewvc?rev=760291&view=rev > Log: > [SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc > > Modified: > > servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java > > Modified: > servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java > URL: > http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff > ============================================================================== > --- > servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java > (original) > +++ > servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java > Tue Mar 31 02:13:00 2009 > @@ -71,6 +71,7 @@ > protected Map<String, MessageExchange> exchanges; > protected int suspentionTime = 60000; > protected boolean started = false; > + private boolean isSTFlow; > > public ConsumerProcessor(HttpEndpoint endpoint) { > super(endpoint); > @@ -96,21 +97,27 @@ > if (cont == null) { > throw new Exception("HTTP request has timed out"); > } > - synchronized (cont) { > - if (locks.remove(exchange.getExchangeId()) == null) { > - throw new Exception("HTTP request has timed out"); > - } > - if (log.isDebugEnabled()) { > - log.debug("Resuming continuation for exchange: " + > exchange.getExchangeId()); > - } > - exchanges.put(exchange.getExchangeId(), exchange); > - cont.resume(); > - if (!cont.isResumed()) { > + > + if (!cont.isPending()) { > + isSTFlow = true; > + } else { > + isSTFlow = false; > + synchronized (cont) { > + if (locks.remove(exchange.getExchangeId()) == null) { > + throw new Exception("HTTP request has timed out"); > + } > if (log.isDebugEnabled()) { > - log.debug("Could not resume continuation for exchange: " > + exchange.getExchangeId()); > + log.debug("Resuming continuation for exchange: " + > exchange.getExchangeId()); > + } > + exchanges.put(exchange.getExchangeId(), exchange); > + cont.resume(); > + if (!cont.isResumed()) { > + if (log.isDebugEnabled()) { > + log.debug("Could not resume continuation for > exchange: " + exchange.getExchangeId()); > + } > + exchanges.remove(exchange.getExchangeId()); > + throw new Exception("HTTP request has timed out for > exchange: " + exchange.getExchangeId()); > } > - exchanges.remove(exchange.getExchangeId()); > - throw new Exception("HTTP request has timed out for > exchange: " + exchange.getExchangeId()); > } > } > } > @@ -168,15 +175,22 @@ > request.setAttribute(MessageExchange.class.getName(), > exchange.getExchangeId()); > synchronized (cont) { > channel.send(exchange); > - if (log.isDebugEnabled()) { > - log.debug("Suspending continuation for exchange: " + > exchange.getExchangeId()); > - } > - boolean result = cont.suspend(suspentionTime); > - exchange = exchanges.remove(exchange.getExchangeId()); > - request.removeAttribute(MessageExchange.class.getName()); > - if (!result) { > - locks.remove(exchange.getExchangeId()); > - throw new Exception("Exchange timed out"); > + if (!isSTFlow) { > + if (log.isDebugEnabled()) { > + log.debug("Suspending continuation for exchange: > " + exchange.getExchangeId()); > + } > + boolean result = cont.suspend(suspentionTime); > + exchange = > exchanges.remove(exchange.getExchangeId()); > + > request.removeAttribute(MessageExchange.class.getName()); > + if (!result) { > + locks.remove(exchange.getExchangeId()); > + throw new Exception("Exchange timed out"); > + } > + } else { > + String id = (String) > request.getAttribute(MessageExchange.class.getName()); > + locks.remove(id); > + exchange = exchanges.remove(id); > + > request.removeAttribute(MessageExchange.class.getName()); > } > } > } catch (RetryRequest retry) { > > > -- Cheers, Guillaume Nodet ------------------------ Blog: http://gnodet.blogspot.com/ ------------------------ Open Source SOA http://fusesource.com
