funny, I had it that way to start then changed it to an element thinking it was less invasive...I see your point though and just changed it back to use @XmlAttribute instead.
Claus Ibsen-2 wrote > > Hi > > > Its better to use an @XmlAttribute for the new option like the other > options on stream config > > + @XmlElement > + private Boolean rejectOld; > > Then the XML DSL is not as confusing and noisy, as it would just be an > attribute instead of a xml tag. > > Also remember that the Scala DSL may be impacted and needs an update. > This may happen when you change the DSL. > If I remember myself then I try to run a full test of camel-scala. But > it may slip my mind sometimes, and then later i bites you :) > > > > On Mon, Jul 16, 2012 at 7:51 PM, <boday@> wrote: >> Author: boday >> Date: Mon Jul 16 17:51:17 2012 >> New Revision: 1362163 >> >> URL: http://svn.apache.org/viewvc?rev=1362163&view=rev >> Log: >> CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an >> error if older messages are received after the last delivered message >> >> Added: >> >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java >> >> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java >> >> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java >> >> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml >> Modified: >> >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java >> >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java >> >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java >> >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java >> >> Modified: >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff >> ============================================================================== >> --- >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java >> (original) >> +++ >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java >> Mon Jul 16 17:51:17 2012 >> @@ -150,6 +150,18 @@ public class ResequenceDefinition extend >> } >> >> /** >> + * Sets the rejectOld flag to throw an error when a message older >> than the last delivered message is processed >> + * @return the builder >> + */ >> + public ResequenceDefinition rejectOld() { >> + if (streamConfig == null) { >> + throw new IllegalStateException("rejectOld() only supported >> for stream resequencer"); >> + } >> + streamConfig.setRejectOld(true); >> + return this; >> + } >> + >> + /** >> * Sets the in batch size for number of exchanges received >> * @param batchSize the batch size >> * @return the builder >> @@ -368,6 +380,7 @@ public class ResequenceDefinition extend >> StreamResequencer resequencer = new >> StreamResequencer(routeContext.getCamelContext(), processor, comparator); >> resequencer.setTimeout(config.getTimeout()); >> resequencer.setCapacity(config.getCapacity()); >> + resequencer.setRejectOld(config.getRejectOld()); >> if (config.getIgnoreInvalidExchanges() != null) { >> >> resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); >> } >> >> Modified: >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff >> ============================================================================== >> --- >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java >> (original) >> +++ >> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java >> Mon Jul 16 17:51:17 2012 >> @@ -19,6 +19,7 @@ package org.apache.camel.model.config; >> import javax.xml.bind.annotation.XmlAccessType; >> import javax.xml.bind.annotation.XmlAccessorType; >> import javax.xml.bind.annotation.XmlAttribute; >> +import javax.xml.bind.annotation.XmlElement; >> import javax.xml.bind.annotation.XmlRootElement; >> import javax.xml.bind.annotation.XmlTransient; >> >> @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext >> private Boolean ignoreInvalidExchanges; >> @XmlTransient >> private ExpressionResultComparator comparator; >> + @XmlElement >> + private Boolean rejectOld; >> >> /** >> * Creates a new {@link StreamResequencerConfig} instance using >> default >> @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext >> public void setComparator(ExpressionResultComparator comparator) { >> this.comparator = comparator; >> } >> - >> + >> + public void setRejectOld(boolean value) { >> + this.rejectOld = value; >> + } >> + >> + public Boolean getRejectOld() { >> + return rejectOld; >> + } >> + >> } >> >> Modified: >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff >> ============================================================================== >> --- >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java >> (original) >> +++ >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java >> Mon Jul 16 17:51:17 2012 >> @@ -141,6 +141,10 @@ public class StreamResequencer extends S >> return ignoreInvalidExchanges; >> } >> >> + public void setRejectOld(Boolean rejectOld) { >> + engine.setRejectOld(rejectOld); >> + } >> + >> /** >> * Sets whether to ignore invalid exchanges which cannot be used by >> this stream resequencer. >> * <p/> >> >> Added: >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto >> ============================================================================== >> --- >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java >> (added) >> +++ >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java >> Mon Jul 16 17:51:17 2012 >> @@ -0,0 +1,32 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >> 2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.camel.processor.resequencer; >> + >> +import org.apache.camel.RuntimeCamelException; >> + >> +/** >> + * An exception thrown if message is rejected by the resequencer >> + * >> + * @version >> + */ >> +public class MessageRejectedException extends RuntimeCamelException { >> + private static final long serialVersionUID = 5755929795399134568L; >> + >> + public MessageRejectedException(String message) { >> + super(message); >> + } >> +} >> \ No newline at end of file >> >> Modified: >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff >> ============================================================================== >> --- >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java >> (original) >> +++ >> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java >> Mon Jul 16 17:51:17 2012 >> @@ -87,6 +87,11 @@ public class ResequencerEngine<E> { >> private SequenceSender<E> sequenceSender; >> >> /** >> + * Indicates whether an error should be thrown if message older >> (based on Comparator) than the last delivered message is received. >> + */ >> + private Boolean rejectOld; >> + >> + /** >> * Creates a new resequencer instance with a default timeout of 2000 >> * milliseconds. >> * >> @@ -136,6 +141,14 @@ public class ResequencerEngine<E> { >> this.timeout = timeout; >> } >> >> + public Boolean getRejectOld() { >> + return rejectOld; >> + } >> + >> + public void setRejectOld(Boolean rejectOld) { >> + this.rejectOld = rejectOld; >> + } >> + >> /** >> * Returns the sequence sender. >> * >> @@ -209,6 +222,9 @@ public class ResequencerEngine<E> { >> // nothing to schedule >> } else if (sequence.predecessor(element) != null) { >> // nothing to schedule >> + } else if (rejectOld != null && rejectOld.booleanValue() && >> beforeLastDelivered(element)) { >> + throw new MessageRejectedException("rejecting message [" + >> element.getObject() >> + + "], it should have been sent before the last >> delivered message [" + lastDelivered.getObject() + "]"); >> } else { >> element.schedule(defineTimeout()); >> } >> @@ -283,6 +299,22 @@ public class ResequencerEngine<E> { >> } >> >> /** >> + * Retuns <code>true</code> if the given element is before the last >> delivered element. >> + * >> + * @param element an element. >> + * @return <code>true</code> if the given element is before the last >> delivered element. >> + */ >> + private boolean beforeLastDelivered(Element<E> element) { >> + if (lastDelivered == null) { >> + return false; >> + } >> + if (sequence.comparator().compare(element, lastDelivered) < 0) { >> + return true; >> + } >> + return false; >> + } >> + >> + /** >> * Creates a timeout task based on the timeout setting of this >> resequencer. >> * >> * @return a new timeout task. >> >> Added: >> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto >> ============================================================================== >> --- >> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java >> (added) >> +++ >> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java >> Mon Jul 16 17:51:17 2012 >> @@ -0,0 +1,93 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >> 2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.camel.processor; >> + >> +import org.apache.camel.ContextTestSupport; >> +import org.apache.camel.builder.RouteBuilder; >> +import org.apache.camel.processor.resequencer.MessageRejectedException; >> + >> +/** >> + * >> + */ >> +public class ResequenceStreamRejectOldExchangesTest extends >> ContextTestSupport { >> + >> + public void testInSequenceAfterTimeout() throws Exception { >> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", >> "C", "E"); >> + getMockEndpoint("mock:error").expectedMessageCount(0); >> + >> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); >> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); >> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); >> + Thread.sleep(1100); >> + template.sendBodyAndHeader("direct:start", "E", "seqno", 5); >> + >> + assertMockEndpointsSatisfied(); >> + } >> + >> + public void testDuplicateAfterTimeout() throws Exception { >> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", >> "C"); >> + getMockEndpoint("mock:error").expectedMessageCount(0); >> + >> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); >> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); >> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); >> + Thread.sleep(1100); >> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); >> + >> + assertMockEndpointsSatisfied(); >> + } >> + >> + public void testOutOfSequenceAfterTimeout() throws Exception { >> + getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", >> "D"); >> + getMockEndpoint("mock:error").expectedBodiesReceived("B"); >> + >> + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); >> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); >> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); >> + Thread.sleep(1100); >> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); >> + >> + assertMockEndpointsSatisfied(); >> + } >> + >> + public void testOutOfSequenceAfterTimeout2() throws Exception { >> + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", >> "D"); >> + getMockEndpoint("mock:error").expectedBodiesReceived("A"); >> + >> + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); >> + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); >> + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); >> + Thread.sleep(1100); >> + template.sendBodyAndHeader("direct:start", "A", "seqno", 1); >> + >> + assertMockEndpointsSatisfied(); >> + } >> + >> + @Override >> + protected RouteBuilder createRouteBuilder() throws Exception { >> + return new RouteBuilder() { >> + @Override >> + public void configure() throws Exception { >> + >> + from("direct:start") >> + >> .onException(MessageRejectedException.class).handled(true).to("mock:error").end() >> + >> .resequence(header("seqno")).stream().timeout(1000).rejectOld() >> + .to("mock:result"); >> + } >> + }; >> + } >> +} >> >> Added: >> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java >> URL: >> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto >> ============================================================================== >> --- >> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java >> (added) >> +++ >> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java >> Mon Jul 16 17:51:17 2012 >> @@ -0,0 +1,32 @@ >> +/** >> + * Licensed to the Apache Software Foundation (ASF) under one or more >> + * contributor license agreements. See the NOTICE file distributed with >> + * this work for additional information regarding copyright ownership. >> + * The ASF licenses this file to You under the Apache License, Version >> 2.0 >> + * (the "License"); you may not use this file except in compliance with >> + * the License. You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> +package org.apache.camel.spring.processor; >> + >> +import org.apache.camel.CamelContext; >> +import >> org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest; >> +import org.apache.camel.processor.ResequencerTest; >> + >> +import static >> org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; >> + >> +/** >> + * @version >> + */ >> +public class SpringResequenceStreamRejectOldExchangesTest extends >> ResequenceStreamRejectOldExchangesTest { >> + protected CamelContext createCamelContext() throws Exception { >> + return createSpringCamelContext(this, >> "org/apache/camel/spring/processor/resequencerRejectOld.xml"); >> + } >> +} >> \ No newline at end of file >> >> Added: >> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml >> URL: >> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto >> ============================================================================== >> --- >> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml >> (added) >> +++ >> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml >> Mon Jul 16 17:51:17 2012 >> @@ -0,0 +1,42 @@ >> +<?xml version="1.0" encoding="UTF-8"?> >> + >> +<beans xmlns="http://www.springframework.org/schema/beans" >> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> + xsi:schemaLocation=" >> + http://www.springframework.org/schema/beans >> http://www.springframework.org/schema/beans/spring-beans.xsd >> + http://camel.apache.org/schema/spring >> http://camel.apache.org/schema/spring/camel-spring.xsd"> >> + >> + <camelContext xmlns="http://camel.apache.org/schema/spring"> >> + <route> >> + <from uri="direct:start"/> >> + <onException> >> + >> <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception> >> + <handled><constant>true</constant></handled> >> + <to uri="mock:error"/> >> + </onException> >> + <resequence> >> + <stream-config capacity="100" timeout="1000"> >> + <rejectOld>true</rejectOld> >> + </stream-config> >> + <header>seqno</header> >> + <to uri="mock:result"/> >> + </resequence> >> + </route> >> + </camelContext> >> + >> +</beans> >> >> > > > > -- > Claus Ibsen > ----------------- > FuseSource > Email: cibsen@ > Web: http://fusesource.com > Twitter: davsclaus, fusenews > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen > ----- Ben O'Day IT Consultant -http://consulting-notes.com -- View this message in context: http://camel.465427.n5.nabble.com/Re-svn-commit-r1362163-in-camel-trunk-camel-core-src-main-java-org-apache-camel-model-camel-core-srcs-tp5716120p5716211.html Sent from the Camel Development mailing list archive at Nabble.com.