Author: boday
Date: Tue Jul 17 16:50:06 2012
New Revision: 1362565
URL: http://svn.apache.org/viewvc?rev=1362565&view=rev
Log:
CAMEL-4327 added more complex rejectOld feature unit test and removed rejected
messages from the sequence
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362565&r1=1362564&r2=1362565&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
Tue Jul 17 16:50:06 2012
@@ -223,6 +223,7 @@ public class ResequencerEngine<E> {
} else if (sequence.predecessor(element) != null) {
// nothing to schedule
} else if (rejectOld != null && rejectOld.booleanValue() &&
beforeLastDelivered(element)) {
+ sequence.remove(element);
throw new MessageRejectedException("rejecting message [" +
element.getObject()
+ "], it should have been sent before the last delivered
message [" + lastDelivered.getObject() + "]");
} else {
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362565&r1=1362564&r2=1362565&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
Tue Jul 17 16:50:06 2012
@@ -51,28 +51,30 @@ public class ResequenceStreamRejectOldEx
assertMockEndpointsSatisfied();
}
- public void testOutOfSequenceAfterTimeout() throws Exception {
- getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", "D");
- getMockEndpoint("mock:error").expectedBodiesReceived("B");
+ public void testOutOfSequenceAfterTimeoutSimple() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+ getMockEndpoint("mock:error").expectedBodiesReceived("A");
template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
- template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
- Thread.sleep(1100);
template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+ Thread.sleep(1100);
+ template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
assertMockEndpointsSatisfied();
}
- public void testOutOfSequenceAfterTimeout2() throws Exception {
- getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
- getMockEndpoint("mock:error").expectedBodiesReceived("A");
+ public void testOutOfSequenceAfterTimeoutComplex() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("A", "D", "E",
"F");
+ getMockEndpoint("mock:error").expectedBodiesReceived("B", "C");
template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
- template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
- template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
- Thread.sleep(1100);
template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
+ Thread.sleep(1100);
+ template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+ template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
+ template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+ template.sendBodyAndHeader("direct:start", "F", "seqno", 6);
assertMockEndpointsSatisfied();
}