ThrottlingInflightRoutePolicy can deadlock
------------------------------------------
Key: CAMEL-4149
URL: https://issues.apache.org/jira/browse/CAMEL-4149
Project: Camel
Issue Type: Bug
Components: camel-core
Affects Versions: 2.7.0
Reporter: Søren Markert
Using ThrottlingInflightRoutePolicy can deadlock a route in some situations.
The unit test pasted in below shows one such situation.
What happens is that the bottom route processes its first exchange, then
suspends. Since it is suspended it will not take the next exchange from the
seda queue, and so it will never check whether it should re-enable the route.
Perhaps it will work by putting the check to re-enable the route in the
onExchangeBegin method, if that is called even when the route is suspended?
{code}
import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultInflightRepository;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
import org.apache.camel.test.CamelTestSupport;
public class ThrottleTest extends CamelTestSupport {
@Produce(uri = "direct:input")
protected ProducerTemplate input;
protected MockEndpoint resultEndpoint;
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
resultEndpoint = new MockEndpoint("mock:result");
resultEndpoint.setCamelContext(getContext());
getContext().setInflightRepository(new
DefaultInflightRepository() {
@Override
public void add(Exchange exchange) {
super.add(exchange);
System.out.println(" add
" + this.size());
}
@Override
public void remove(Exchange exchange) {
super.remove(exchange);
System.out.println(" remove
" + this.size());
}
});
ThrottlingInflightRoutePolicy throttler = new
ThrottlingInflightRoutePolicy();
throttler.setMaxInflightExchanges(1);
throttler.setScope(ThrottlingScope.Context);
from("direct:input")
.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey",
"seda:hey")
.delay(1000)
.inOnly("log:inputDone");
from("seda:hey")
.routePolicy(throttler)
.inOut("log:outputDone")
.to(resultEndpoint);
}
};
}
public void testThatAllExchangesAreReceived() throws Exception {
input.sendBody("hello");
resultEndpoint.expectedMessageCount(5);
resultEndpoint.assertIsSatisfied();
}
}
{code}
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira