Author: boday
Date: Tue Jul 17 17:47:19 2012
New Revision: 1362585
URL: http://svn.apache.org/viewvc?rev=1362585&view=rev
Log:
CAMEL-4327 fixed randomly failing resequencer unit tests by using capacity
instead of timeout, tweaked the rejectOld validation logic to short-circuit
earlier
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
Tue Jul 17 17:47:19 2012
@@ -206,6 +206,12 @@ public class ResequencerEngine<E> {
throw new IllegalArgumentException("Element cannot be used in
comparator: " + sequence.comparator());
}
+ // validate the exchange shouldn't be 'rejected' (if applicable)
+ if (rejectOld != null && rejectOld.booleanValue() &&
beforeLastDelivered(element)) {
+ throw new MessageRejectedException("rejecting message [" +
element.getObject()
+ + "], it should have been sent before the last delivered
message [" + lastDelivered.getObject() + "]");
+ }
+
// add element to sequence in proper order
sequence.add(element);
@@ -222,10 +228,6 @@ public class ResequencerEngine<E> {
// nothing to schedule
} else if (sequence.predecessor(element) != null) {
// nothing to schedule
- } else if (rejectOld != null && rejectOld.booleanValue() &&
beforeLastDelivered(element)) {
- sequence.remove(element);
- throw new MessageRejectedException("rejecting message [" +
element.getObject()
- + "], it should have been sent before the last delivered
message [" + lastDelivered.getObject() + "]");
} else {
element.schedule(defineTimeout());
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
Tue Jul 17 17:47:19 2012
@@ -25,54 +25,52 @@ import org.apache.camel.processor.resequ
*/
public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport
{
- public void testInSequenceAfterTimeout() throws Exception {
+ public void testInSequenceAfterCapacityReached() throws Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C",
"E");
getMockEndpoint("mock:error").expectedMessageCount(0);
template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
- Thread.sleep(1100);
template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
assertMockEndpointsSatisfied();
}
- public void testDuplicateAfterTimeout() throws Exception {
+ public void testDuplicateAfterCapacityReached() throws Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
getMockEndpoint("mock:error").expectedMessageCount(0);
template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
- Thread.sleep(1100);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
assertMockEndpointsSatisfied();
}
- public void testOutOfSequenceAfterTimeoutSimple() throws Exception {
+ public void testOutOfSequenceAfterCapacityReachedSimple() throws Exception
{
getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
getMockEndpoint("mock:error").expectedBodiesReceived("A");
template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
- Thread.sleep(1100);
template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
assertMockEndpointsSatisfied();
}
- public void testOutOfSequenceAfterTimeoutComplex() throws Exception {
+
+ public void testOutOfSequenceAfterCapacityReachedComplex() throws
Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("A", "D", "E",
"F");
getMockEndpoint("mock:error").expectedBodiesReceived("B", "C");
+ template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
- Thread.sleep(1100);
+
template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
- template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
template.sendBodyAndHeader("direct:start", "F", "seqno", 6);
@@ -86,9 +84,9 @@ public class ResequenceStreamRejectOldEx
public void configure() throws Exception {
from("direct:start")
-
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
-
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
- .to("mock:result");
+
.onException(MessageRejectedException.class).maximumRedeliveries(0).handled(true).to("mock:error").end()
+
.resequence(header("seqno")).stream().capacity(3).rejectOld()
+ .to("mock:result");
}
};
}
Modified:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
Tue Jul 17 17:47:19 2012
@@ -30,7 +30,7 @@
<to uri="mock:error"/>
</onException>
<resequence>
- <stream-config capacity="100" timeout="1000">
+ <stream-config capacity="3" timeout="1000">
<rejectOld>true</rejectOld>
</stream-config>
<header>seqno</header>