Hi
Its better to use an @XmlAttribute for the new option like the other options on stream config + @XmlElement + private Boolean rejectOld; Then the XML DSL is not as confusing and noisy, as it would just be an attribute instead of a xml tag. Also remember that the Scala DSL may be impacted and needs an update. This may happen when you change the DSL. If I remember myself then I try to run a full test of camel-scala. But it may slip my mind sometimes, and then later i bites you :) On Mon, Jul 16, 2012 at 7:51 PM, <bo...@apache.org> wrote: > Author: boday > Date: Mon Jul 16 17:51:17 2012 > New Revision: 1362163 > > URL: http://svn.apache.org/viewvc?rev=1362163&view=rev > Log: > CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an error > if older messages are received after the last delivered message > > Added: > > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java > > camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java > > camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java > > camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml > Modified: > > camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java > > camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java > > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java > > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java > > Modified: > camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java > URL: > http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff > ============================================================================== > --- > camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java > (original) > +++ > camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java > Mon Jul 16 17:51:17 2012 > @@ -150,6 +150,18 @@ public class ResequenceDefinition extend > } > > /** > + * Sets the rejectOld flag to throw an error when a message older than > the last delivered message is processed > + * @return the builder > + */ > + public ResequenceDefinition rejectOld() { > + if (streamConfig == null) { > + throw new IllegalStateException("rejectOld() only supported for > stream resequencer"); > + } > + streamConfig.setRejectOld(true); > + return this; > + } > + > + /** > * Sets the in batch size for number of exchanges received > * @param batchSize the batch size > * @return the builder > @@ -368,6 +380,7 @@ public class ResequenceDefinition extend > StreamResequencer resequencer = new > StreamResequencer(routeContext.getCamelContext(), processor, comparator); > resequencer.setTimeout(config.getTimeout()); > resequencer.setCapacity(config.getCapacity()); > + resequencer.setRejectOld(config.getRejectOld()); > if (config.getIgnoreInvalidExchanges() != null) { > > resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); > } > > Modified: > camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java > URL: > http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff > ============================================================================== > --- > camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java > (original) > +++ > camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java > Mon Jul 16 17:51:17 2012 > @@ -19,6 +19,7 @@ package org.apache.camel.model.config; > import javax.xml.bind.annotation.XmlAccessType; > import javax.xml.bind.annotation.XmlAccessorType; > import javax.xml.bind.annotation.XmlAttribute; > +import javax.xml.bind.annotation.XmlElement; > import javax.xml.bind.annotation.XmlRootElement; > import javax.xml.bind.annotation.XmlTransient; > > @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext > private Boolean ignoreInvalidExchanges; > @XmlTransient > private ExpressionResultComparator comparator; > + @XmlElement > + private Boolean rejectOld; > > /** > * Creates a new {@link StreamResequencerConfig} instance using default > @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext > public void setComparator(ExpressionResultComparator comparator) { > this.comparator = comparator; > } > - > + > + public void setRejectOld(boolean value) { > + this.rejectOld = value; > + } > + > + public Boolean getRejectOld() { > + return rejectOld; > + } > + > } > > Modified: > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java > URL: > http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff > ============================================================================== > --- > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java > (original) > +++ > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java > Mon Jul 16 17:51:17 2012 > @@ -141,6 +141,10 @@ public class StreamResequencer extends S > return ignoreInvalidExchanges; > } > > + public void setRejectOld(Boolean rejectOld) { > + engine.setRejectOld(rejectOld); > + } > + > /** > * Sets whether to ignore invalid exchanges which cannot be used by this > stream resequencer. > * <p/> > > Added: > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java > URL: > http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto > ============================================================================== > --- > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java > (added) > +++ > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java > Mon Jul 16 17:51:17 2012 > @@ -0,0 +1,32 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.camel.processor.resequencer; > + > +import org.apache.camel.RuntimeCamelException; > + > +/** > + * An exception thrown if message is rejected by the resequencer > + * > + * @version > + */ > +public class MessageRejectedException extends RuntimeCamelException { > + private static final long serialVersionUID = 5755929795399134568L; > + > + public MessageRejectedException(String message) { > + super(message); > + } > +} > \ No newline at end of file > > Modified: > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java > URL: > http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff > ============================================================================== > --- > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java > (original) > +++ > camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java > Mon Jul 16 17:51:17 2012 > @@ -87,6 +87,11 @@ public class ResequencerEngine<E> { > private SequenceSender<E> sequenceSender; > > /** > + * Indicates whether an error should be thrown if message older (based > on Comparator) than the last delivered message is received. > + */ > + private Boolean rejectOld; > + > + /** > * Creates a new resequencer instance with a default timeout of 2000 > * milliseconds. > * > @@ -136,6 +141,14 @@ public class ResequencerEngine<E> { > this.timeout = timeout; > } > > + public Boolean getRejectOld() { > + return rejectOld; > + } > + > + public void setRejectOld(Boolean rejectOld) { > + this.rejectOld = rejectOld; > + } > + > /** > * Returns the sequence sender. > * > @@ -209,6 +222,9 @@ public class ResequencerEngine<E> { > // nothing to schedule > } else if (sequence.predecessor(element) != null) { > // nothing to schedule > + } else if (rejectOld != null && rejectOld.booleanValue() && > beforeLastDelivered(element)) { > + throw new MessageRejectedException("rejecting message [" + > element.getObject() > + + "], it should have been sent before the last delivered > message [" + lastDelivered.getObject() + "]"); > } else { > element.schedule(defineTimeout()); > } > @@ -283,6 +299,22 @@ public class ResequencerEngine<E> { > } > > /** > + * Retuns <code>true</code> if the given element is before the last > delivered element. > + * > + * @param element an element. > + * @return <code>true</code> if the given element is before the last > delivered element. > + */ > + private boolean beforeLastDelivered(Element<E> element) { > + if (lastDelivered == null) { > + return false; > + } > + if (sequence.comparator().compare(element, lastDelivered) < 0) { > + return true; > + } > + return false; > + } > + > + /** > * Creates a timeout task based on the timeout setting of this > resequencer. > * > * @return a new timeout task. > > Added: > camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java > URL: > http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto > ============================================================================== > --- > camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java > (added) > +++ > camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java > Mon Jul 16 17:51:17 2012 > @@ -0,0 +1,93 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.camel.processor; > + > +import org.apache.camel.ContextTestSupport; > +import org.apache.camel.builder.RouteBuilder; > +import org.apache.camel.processor.resequencer.MessageRejectedException; > + > +/** > + * > + */ > +public class ResequenceStreamRejectOldExchangesTest extends > ContextTestSupport { > + > + public void testInSequenceAfterTimeout() throws Exception { > + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", > "E"); > + getMockEndpoint("mock:error").expectedMessageCount(0); > + > + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); > + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); > + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); > + Thread.sleep(1100); > + template.sendBodyAndHeader("direct:start", "E", "seqno", 5); > + > + assertMockEndpointsSatisfied(); > + } > + > + public void testDuplicateAfterTimeout() throws Exception { > + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C"); > + getMockEndpoint("mock:error").expectedMessageCount(0); > + > + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); > + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); > + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); > + Thread.sleep(1100); > + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); > + > + assertMockEndpointsSatisfied(); > + } > + > + public void testOutOfSequenceAfterTimeout() throws Exception { > + getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", "D"); > + getMockEndpoint("mock:error").expectedBodiesReceived("B"); > + > + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); > + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); > + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); > + Thread.sleep(1100); > + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); > + > + assertMockEndpointsSatisfied(); > + } > + > + public void testOutOfSequenceAfterTimeout2() throws Exception { > + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); > + getMockEndpoint("mock:error").expectedBodiesReceived("A"); > + > + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); > + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); > + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); > + Thread.sleep(1100); > + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); > + > + assertMockEndpointsSatisfied(); > + } > + > + @Override > + protected RouteBuilder createRouteBuilder() throws Exception { > + return new RouteBuilder() { > + @Override > + public void configure() throws Exception { > + > + from("direct:start") > + > .onException(MessageRejectedException.class).handled(true).to("mock:error").end() > + > .resequence(header("seqno")).stream().timeout(1000).rejectOld() > + .to("mock:result"); > + } > + }; > + } > +} > > Added: > camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto > ============================================================================== > --- > camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java > (added) > +++ > camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java > Mon Jul 16 17:51:17 2012 > @@ -0,0 +1,32 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.camel.spring.processor; > + > +import org.apache.camel.CamelContext; > +import org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest; > +import org.apache.camel.processor.ResequencerTest; > + > +import static > org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; > + > +/** > + * @version > + */ > +public class SpringResequenceStreamRejectOldExchangesTest extends > ResequenceStreamRejectOldExchangesTest { > + protected CamelContext createCamelContext() throws Exception { > + return createSpringCamelContext(this, > "org/apache/camel/spring/processor/resequencerRejectOld.xml"); > + } > +} > \ No newline at end of file > > Added: > camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto > ============================================================================== > --- > camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml > (added) > +++ > camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml > Mon Jul 16 17:51:17 2012 > @@ -0,0 +1,42 @@ > +<?xml version="1.0" encoding="UTF-8"?> > +<!-- > + Licensed to the Apache Software Foundation (ASF) under one or more > + contributor license agreements. See the NOTICE file distributed with > + this work for additional information regarding copyright ownership. > + The ASF licenses this file to You under the Apache License, Version 2.0 > + (the "License"); you may not use this file except in compliance with > + the License. You may obtain a copy of the License at > + > + http://www.apache.org/licenses/LICENSE-2.0 > + > + Unless required by applicable law or agreed to in writing, software > + distributed under the License is distributed on an "AS IS" BASIS, > + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + See the License for the specific language governing permissions and > + limitations under the License. > +--> > +<beans xmlns="http://www.springframework.org/schema/beans" > + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > + xsi:schemaLocation=" > + http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > + http://camel.apache.org/schema/spring > http://camel.apache.org/schema/spring/camel-spring.xsd"> > + > + <camelContext xmlns="http://camel.apache.org/schema/spring"> > + <route> > + <from uri="direct:start"/> > + <onException> > + > <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception> > + <handled><constant>true</constant></handled> > + <to uri="mock:error"/> > + </onException> > + <resequence> > + <stream-config capacity="100" timeout="1000"> > + <rejectOld>true</rejectOld> > + </stream-config> > + <header>seqno</header> > + <to uri="mock:result"/> > + </resequence> > + </route> > + </camelContext> > + > +</beans> > > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen