Freeman, I think this modification should be done on the
ConsumerEndpoint too in servicemix-http...

On Tue, Mar 31, 2009 at 04:13,  <[email protected]> wrote:
> Author: ffang
> Date: Tue Mar 31 02:13:00 2009
> New Revision: 760291
>
> URL: http://svn.apache.org/viewvc?rev=760291&view=rev
> Log:
> [SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
>
> Modified:
>    
> servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>
> Modified: 
> servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
> URL: 
> http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
> ==============================================================================
> --- 
> servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>  (original)
> +++ 
> servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>  Tue Mar 31 02:13:00 2009
> @@ -71,6 +71,7 @@
>     protected Map<String, MessageExchange> exchanges;
>     protected int suspentionTime = 60000;
>     protected boolean started = false;
> +    private boolean isSTFlow;
>
>     public ConsumerProcessor(HttpEndpoint endpoint) {
>         super(endpoint);
> @@ -96,21 +97,27 @@
>         if (cont == null) {
>             throw new Exception("HTTP request has timed out");
>         }
> -        synchronized (cont) {
> -            if (locks.remove(exchange.getExchangeId()) == null) {
> -                throw new Exception("HTTP request has timed out");
> -            }
> -            if (log.isDebugEnabled()) {
> -                log.debug("Resuming continuation for exchange: " + 
> exchange.getExchangeId());
> -            }
> -            exchanges.put(exchange.getExchangeId(), exchange);
> -            cont.resume();
> -            if (!cont.isResumed()) {
> +
> +        if (!cont.isPending()) {
> +            isSTFlow = true;
> +        } else {
> +            isSTFlow = false;
> +            synchronized (cont) {
> +                if (locks.remove(exchange.getExchangeId()) == null) {
> +                    throw new Exception("HTTP request has timed out");
> +                }
>                 if (log.isDebugEnabled()) {
> -                    log.debug("Could not resume continuation for exchange: " 
> + exchange.getExchangeId());
> +                    log.debug("Resuming continuation for exchange: " + 
> exchange.getExchangeId());
> +                }
> +                exchanges.put(exchange.getExchangeId(), exchange);
> +                cont.resume();
> +                if (!cont.isResumed()) {
> +                    if (log.isDebugEnabled()) {
> +                        log.debug("Could not resume continuation for 
> exchange: " + exchange.getExchangeId());
> +                    }
> +                    exchanges.remove(exchange.getExchangeId());
> +                    throw new Exception("HTTP request has timed out for 
> exchange: " + exchange.getExchangeId());
>                 }
> -                exchanges.remove(exchange.getExchangeId());
> -                throw new Exception("HTTP request has timed out for 
> exchange: " + exchange.getExchangeId());
>             }
>         }
>     }
> @@ -168,15 +175,22 @@
>                 request.setAttribute(MessageExchange.class.getName(), 
> exchange.getExchangeId());
>                 synchronized (cont) {
>                     channel.send(exchange);
> -                    if (log.isDebugEnabled()) {
> -                        log.debug("Suspending continuation for exchange: " + 
> exchange.getExchangeId());
> -                    }
> -                    boolean result = cont.suspend(suspentionTime);
> -                    exchange = exchanges.remove(exchange.getExchangeId());
> -                    request.removeAttribute(MessageExchange.class.getName());
> -                    if (!result) {
> -                        locks.remove(exchange.getExchangeId());
> -                        throw new Exception("Exchange timed out");
> +                    if (!isSTFlow) {
> +                        if (log.isDebugEnabled()) {
> +                            log.debug("Suspending continuation for exchange: 
> " + exchange.getExchangeId());
> +                        }
> +                        boolean result = cont.suspend(suspentionTime);
> +                        exchange = 
> exchanges.remove(exchange.getExchangeId());
> +                        
> request.removeAttribute(MessageExchange.class.getName());
> +                        if (!result) {
> +                            locks.remove(exchange.getExchangeId());
> +                            throw new Exception("Exchange timed out");
> +                        }
> +                    } else {
> +                        String id = (String) 
> request.getAttribute(MessageExchange.class.getName());
> +                        locks.remove(id);
> +                        exchange = exchanges.remove(id);
> +                        
> request.removeAttribute(MessageExchange.class.getName());
>                     }
>                 }
>             } catch (RetryRequest retry) {
>
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Reply via email to