Hi

Its better to use an @XmlAttribute for the new option like the other
options on stream config

+    @XmlElement
+    private Boolean rejectOld;

Then the XML DSL is not as confusing and noisy, as it would just be an
attribute instead of a xml tag.

Also remember that the Scala DSL may be impacted and needs an update.
This may happen when you change the DSL.
If I remember myself then I try to run a full test of camel-scala. But
it may slip my mind sometimes, and then later i bites you :)



On Mon, Jul 16, 2012 at 7:51 PM,  <bo...@apache.org> wrote:
> Author: boday
> Date: Mon Jul 16 17:51:17 2012
> New Revision: 1362163
>
> URL: http://svn.apache.org/viewvc?rev=1362163&view=rev
> Log:
> CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an error 
> if older messages are received after the last delivered message
>
> Added:
>     
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>     
> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>     
> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>     
> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
> Modified:
>     
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>     
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>     
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>     
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>
> Modified: 
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- 
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>  (original)
> +++ 
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
>  Mon Jul 16 17:51:17 2012
> @@ -150,6 +150,18 @@ public class ResequenceDefinition extend
>      }
>
>      /**
> +     * Sets the rejectOld flag to throw an error when a message older than 
> the last delivered message is processed
> +     * @return the builder
> +     */
> +    public ResequenceDefinition rejectOld() {
> +        if (streamConfig == null) {
> +            throw new IllegalStateException("rejectOld() only supported for 
> stream resequencer");
> +        }
> +        streamConfig.setRejectOld(true);
> +        return this;
> +    }
> +
> +    /**
>       * Sets the in batch size for number of exchanges received
>       * @param batchSize  the batch size
>       * @return the builder
> @@ -368,6 +380,7 @@ public class ResequenceDefinition extend
>          StreamResequencer resequencer = new 
> StreamResequencer(routeContext.getCamelContext(), processor, comparator);
>          resequencer.setTimeout(config.getTimeout());
>          resequencer.setCapacity(config.getCapacity());
> +        resequencer.setRejectOld(config.getRejectOld());
>          if (config.getIgnoreInvalidExchanges() != null) {
>              
> resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
>          }
>
> Modified: 
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- 
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>  (original)
> +++ 
> camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
>  Mon Jul 16 17:51:17 2012
> @@ -19,6 +19,7 @@ package org.apache.camel.model.config;
>  import javax.xml.bind.annotation.XmlAccessType;
>  import javax.xml.bind.annotation.XmlAccessorType;
>  import javax.xml.bind.annotation.XmlAttribute;
> +import javax.xml.bind.annotation.XmlElement;
>  import javax.xml.bind.annotation.XmlRootElement;
>  import javax.xml.bind.annotation.XmlTransient;
>
> @@ -41,6 +42,8 @@ public class StreamResequencerConfig ext
>      private Boolean ignoreInvalidExchanges;
>      @XmlTransient
>      private ExpressionResultComparator comparator;
> +    @XmlElement
> +    private Boolean rejectOld;
>
>      /**
>       * Creates a new {@link StreamResequencerConfig} instance using default
> @@ -123,5 +126,13 @@ public class StreamResequencerConfig ext
>      public void setComparator(ExpressionResultComparator comparator) {
>          this.comparator = comparator;
>      }
> -
> +
> +    public void setRejectOld(boolean value) {
> +        this.rejectOld = value;
> +    }
> +
> +    public Boolean getRejectOld() {
> +        return rejectOld;
> +    }
> +
>  }
>
> Modified: 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>  (original)
> +++ 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
>  Mon Jul 16 17:51:17 2012
> @@ -141,6 +141,10 @@ public class StreamResequencer extends S
>          return ignoreInvalidExchanges;
>      }
>
> +    public void setRejectOld(Boolean rejectOld) {
> +        engine.setRejectOld(rejectOld);
> +    }
> +
>      /**
>       * Sets whether to ignore invalid exchanges which cannot be used by this 
> stream resequencer.
>       * <p/>
>
> Added: 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto
> ==============================================================================
> --- 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>  (added)
> +++ 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
>  Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,32 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor.resequencer;
> +
> +import org.apache.camel.RuntimeCamelException;
> +
> +/**
> + * An exception thrown if message is rejected by the resequencer
> + *
> + * @version
> + */
> +public class MessageRejectedException extends RuntimeCamelException {
> +    private static final long serialVersionUID = 5755929795399134568L;
> +
> +    public MessageRejectedException(String message) {
> +        super(message);
> +    }
> +}
> \ No newline at end of file
>
> Modified: 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff
> ==============================================================================
> --- 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>  (original)
> +++ 
> camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
>  Mon Jul 16 17:51:17 2012
> @@ -87,6 +87,11 @@ public class ResequencerEngine<E> {
>      private SequenceSender<E> sequenceSender;
>
>      /**
> +     * Indicates whether an error should be thrown if message older (based 
> on Comparator) than the last delivered message is received.
> +     */
> +    private Boolean rejectOld;
> +
> +    /**
>       * Creates a new resequencer instance with a default timeout of 2000
>       * milliseconds.
>       *
> @@ -136,6 +141,14 @@ public class ResequencerEngine<E> {
>          this.timeout = timeout;
>      }
>
> +    public Boolean getRejectOld() {
> +        return rejectOld;
> +    }
> +
> +    public void setRejectOld(Boolean rejectOld) {
> +        this.rejectOld = rejectOld;
> +    }
> +
>      /**
>       * Returns the sequence sender.
>       *
> @@ -209,6 +222,9 @@ public class ResequencerEngine<E> {
>              // nothing to schedule
>          } else if (sequence.predecessor(element) != null) {
>              // nothing to schedule
> +        } else if (rejectOld != null && rejectOld.booleanValue() && 
> beforeLastDelivered(element)) {
> +            throw new MessageRejectedException("rejecting message [" + 
> element.getObject()
> +                    + "], it should have been sent before the last delivered 
> message [" + lastDelivered.getObject() + "]");
>          } else {
>              element.schedule(defineTimeout());
>          }
> @@ -283,6 +299,22 @@ public class ResequencerEngine<E> {
>      }
>
>      /**
> +     * Retuns <code>true</code> if the given element is before the last 
> delivered element.
> +     *
> +     * @param element an element.
> +     * @return <code>true</code> if the given element is before the last 
> delivered element.
> +     */
> +    private boolean beforeLastDelivered(Element<E> element) {
> +        if (lastDelivered == null) {
> +            return false;
> +        }
> +        if (sequence.comparator().compare(element, lastDelivered) < 0) {
> +            return true;
> +        }
> +        return false;
> +    }
> +
> +    /**
>       * Creates a timeout task based on the timeout setting of this 
> resequencer.
>       *
>       * @return a new timeout task.
>
> Added: 
> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
> ==============================================================================
> --- 
> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>  (added)
> +++ 
> camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
>  Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,93 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.processor.resequencer.MessageRejectedException;
> +
> +/**
> + *
> + */
> +public class ResequenceStreamRejectOldExchangesTest extends 
> ContextTestSupport {
> +
> +    public void testInSequenceAfterTimeout() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", 
> "E");
> +        getMockEndpoint("mock:error").expectedMessageCount(0);
> +
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testDuplicateAfterTimeout() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
> +        getMockEndpoint("mock:error").expectedMessageCount(0);
> +
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testOutOfSequenceAfterTimeout() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", "D");
> +        getMockEndpoint("mock:error").expectedBodiesReceived("B");
> +
> +        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testOutOfSequenceAfterTimeout2() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
> +        getMockEndpoint("mock:error").expectedBodiesReceived("A");
> +
> +        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
> +        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
> +        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
> +        Thread.sleep(1100);
> +        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +
> +                from("direct:start")
> +                        
> .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
> +                        
> .resequence(header("seqno")).stream().timeout(1000).rejectOld()
> +                        .to("mock:result");
> +            }
> +        };
> +    }
> +}
>
> Added: 
> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
> ==============================================================================
> --- 
> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>  (added)
> +++ 
> camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
>  Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,32 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.spring.processor;
> +
> +import org.apache.camel.CamelContext;
> +import org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest;
> +import org.apache.camel.processor.ResequencerTest;
> +
> +import static 
> org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
> +
> +/**
> + * @version
> + */
> +public class SpringResequenceStreamRejectOldExchangesTest extends 
> ResequenceStreamRejectOldExchangesTest {
> +    protected CamelContext createCamelContext() throws Exception {
> +        return createSpringCamelContext(this, 
> "org/apache/camel/spring/processor/resequencerRejectOld.xml");
> +    }
> +}
> \ No newline at end of file
>
> Added: 
> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto
> ==============================================================================
> --- 
> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>  (added)
> +++ 
> camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
>  Mon Jul 16 17:51:17 2012
> @@ -0,0 +1,42 @@
> +<?xml version="1.0" encoding="UTF-8"?>
> +<!--
> +    Licensed to the Apache Software Foundation (ASF) under one or more
> +    contributor license agreements.  See the NOTICE file distributed with
> +    this work for additional information regarding copyright ownership.
> +    The ASF licenses this file to You under the Apache License, Version 2.0
> +    (the "License"); you may not use this file except in compliance with
> +    the License.  You may obtain a copy of the License at
> +
> +    http://www.apache.org/licenses/LICENSE-2.0
> +
> +    Unless required by applicable law or agreed to in writing, software
> +    distributed under the License is distributed on an "AS IS" BASIS,
> +    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +    See the License for the specific language governing permissions and
> +    limitations under the License.
> +-->
> +<beans xmlns="http://www.springframework.org/schema/beans";
> +       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> +       xsi:schemaLocation="
> +       http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans.xsd
> +       http://camel.apache.org/schema/spring 
> http://camel.apache.org/schema/spring/camel-spring.xsd";>
> +
> +    <camelContext xmlns="http://camel.apache.org/schema/spring";>
> +        <route>
> +            <from uri="direct:start"/>
> +            <onException>
> +                
> <exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception>
> +                <handled><constant>true</constant></handled>
> +                <to uri="mock:error"/>
> +            </onException>
> +            <resequence>
> +                <stream-config capacity="100" timeout="1000">
> +                    <rejectOld>true</rejectOld>
> +                </stream-config>
> +                <header>seqno</header>
> +                <to uri="mock:result"/>
> +            </resequence>
> +        </route>
> +    </camelContext>
> +
> +</beans>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to