Repository: camel Updated Branches: refs/heads/master e8c385d35 -> 7593d1366
CAMEL-11446: Use awaitility for testing where we otherwise use thred sleep which can be speeded up. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7593d136 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7593d136 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7593d136 Branch: refs/heads/master Commit: 7593d1366334d6f3e9763d937ccdfe6fc32221a4 Parents: e8c385d Author: Claus Ibsen <[email protected]> Authored: Sun Jul 16 13:50:58 2017 +0200 Committer: Claus Ibsen <[email protected]> Committed: Sun Jul 16 13:50:58 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/model/ResequenceDefinition.java | 18 ++++++++++++++++++ .../model/config/StreamResequencerConfig.java | 14 ++++++++++++++ .../apache/camel/processor/StreamResequencer.java | 10 +++++++--- ...equenceBatchNotIgnoreInvalidExchangesTest.java | 2 +- ...esequenceStreamIgnoreInvalidExchangesTest.java | 2 +- ...quenceStreamNotIgnoreInvalidExchangesTest.java | 2 +- .../ResequenceStreamRejectOldExchangesTest.java | 2 +- .../apache/camel/processor/ResequencerTest.java | 2 +- .../camel/processor/StreamResequencerTest.java | 2 +- .../processor/aggregator/AggregatorTest.java | 6 +++--- .../resequencer/ResequencerBatchOrderTest.java | 2 +- .../apache/camel/spring/processor/resequencer.xml | 1 + .../spring/processor/resequencerRejectOld.xml | 2 +- .../camel/spring/processor/streamResequencer.xml | 2 +- 14 files changed, 52 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java index ef6a423..08df767 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java @@ -154,6 +154,21 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti } /** + * Sets the interval in milli seconds the stream resequencer will at most wait + * while waiting for condition of being able to deliver. + * + * @param deliveryAttemptInterval interval in millis + * @return the builder + */ + public ResequenceDefinition deliveryAttemptInterval(long deliveryAttemptInterval) { + if (streamConfig == null) { + throw new IllegalStateException("deliveryAttemptInterval() only supported for stream resequencer"); + } + streamConfig.setDeliveryAttemptInterval(deliveryAttemptInterval); + return this; + } + + /** * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed * @return the builder */ @@ -405,6 +420,9 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator, expression); resequencer.setTimeout(config.getTimeout()); + if (config.getDeliveryAttemptInterval() != null) { + resequencer.setDeliveryAttemptInterval(config.getDeliveryAttemptInterval()); + } resequencer.setCapacity(config.getCapacity()); resequencer.setRejectOld(config.getRejectOld()); if (config.getIgnoreInvalidExchanges() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java b/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java index 37d91f2..7d56d78 100644 --- a/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java +++ b/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java @@ -37,6 +37,8 @@ public class StreamResequencerConfig extends ResequencerConfig { private Integer capacity; @XmlAttribute @Metadata(defaultValue = "1000") private Long timeout; + @XmlAttribute @Metadata(defaultValue = "1000") + private Long deliveryAttemptInterval; @XmlAttribute private Boolean ignoreInvalidExchanges; @XmlTransient @@ -148,6 +150,18 @@ public class StreamResequencerConfig extends ResequencerConfig { this.timeout = timeout; } + public Long getDeliveryAttemptInterval() { + return deliveryAttemptInterval; + } + + /** + * Sets the interval in milli seconds the stream resequencer will at most wait + * while waiting for condition of being able to deliver. + */ + public void setDeliveryAttemptInterval(Long deliveryAttemptInterval) { + this.deliveryAttemptInterval = deliveryAttemptInterval; + } + public Boolean getIgnoreInvalidExchanges() { return ignoreInvalidExchanges; } http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java index c3d7e70..0df5400 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -69,7 +69,6 @@ import org.slf4j.LoggerFactory; */ public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, AsyncProcessor, Navigate<Processor>, Traceable, IdAware { - private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; private static final Logger LOG = LoggerFactory.getLogger(StreamResequencer.class); private String id; @@ -81,7 +80,8 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< private Delivery delivery; private int capacity; private boolean ignoreInvalidExchanges; - + private long deliveryAttemptInterval = 1000L; + /** * Creates a new {@link StreamResequencer} instance. * @@ -149,6 +149,10 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< engine.setTimeout(timeout); } + public void setDeliveryAttemptInterval(long deliveryAttemptInterval) { + this.deliveryAttemptInterval = deliveryAttemptInterval; + } + public boolean isIgnoreInvalidExchanges() { return ignoreInvalidExchanges; } @@ -272,7 +276,7 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< try { deliveryRequestLock.lock(); try { - deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS); + deliveryRequestCondition.await(deliveryAttemptInterval, TimeUnit.MILLISECONDS); } finally { deliveryRequestLock.unlock(); } http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java b/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java index ac408e5..b88284c 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java @@ -29,7 +29,7 @@ public class ResequenceBatchNotIgnoreInvalidExchangesTest extends ResequenceStre @Override public void configure() throws Exception { from("direct:start") - .resequence(header("seqno")).batch().timeout(1000) + .resequence(header("seqno")).batch().timeout(50) .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java b/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java index 4aab1aa..0c9dac9 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java @@ -79,7 +79,7 @@ public class ResequenceStreamIgnoreInvalidExchangesTest extends ContextTestSuppo public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") - .resequence(header("seqno")).stream().timeout(250) + .resequence(header("seqno")).stream().timeout(50).deliveryAttemptInterval(10) // ignore invalid exchanges (they are discarded) .ignoreInvalidExchanges() .to("mock:result"); http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java b/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java index c0307a2..a07c92f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java @@ -95,7 +95,7 @@ public class ResequenceStreamNotIgnoreInvalidExchangesTest extends ContextTestSu @Override public void configure() throws Exception { from("direct:start") - .resequence(header("seqno")).stream().timeout(1000) + .resequence(header("seqno")).stream().timeout(50).deliveryAttemptInterval(10) .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java b/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java index 2fe11e8..1e4cd0c 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java @@ -86,7 +86,7 @@ public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport { from("direct:start") .onException(MessageRejectedException.class).maximumRedeliveries(0).handled(true).to("mock:error").end() - .resequence(header("seqno")).stream().capacity(3).rejectOld().timeout(50) // use low timeout to run faster + .resequence(header("seqno")).stream().capacity(3).rejectOld().timeout(50).deliveryAttemptInterval(10) // use low timeout to run faster .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java index 2fd4f02..38d935a 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java @@ -61,7 +61,7 @@ public class ResequencerTest extends ContextTestSupport { public void configure() { // START SNIPPET: example from("direct:start") - .resequence().body() + .resequence().body().timeout(50) .to("mock:result"); // END SNIPPET: example } http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java b/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java index fa232c4..8654c0c 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java @@ -91,7 +91,7 @@ public class StreamResequencerTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { // START SNIPPET: example - from("direct:start").resequence(header("seqnum")).stream().to("mock:result"); + from("direct:start").resequence(header("seqnum")).stream().timeout(100).deliveryAttemptInterval(10).to("mock:result"); // END SNIPPET: example } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java index bd3dea0..12d07c7 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java @@ -82,16 +82,16 @@ public class AggregatorTest extends ContextTestSupport { // START SNIPPET: ex // in this route we aggregate all from direct:state based on the header id cheese from("direct:start") - .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L) + .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(100).completionTimeoutCheckerInterval(10) .to("mock:result"); from("seda:header").setHeader("visited", constant(true)) - .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L) + .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(100).completionTimeoutCheckerInterval(10) .to("mock:result"); // in this sample we aggregate with a completion predicate from("direct:predicate") - .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L) + .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(100).completionTimeoutCheckerInterval(10) .completionPredicate(header("cheese").isEqualTo(123)) .to("mock:result"); // END SNIPPET: ex http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java b/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java index e677b9c..55c8399 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java @@ -30,7 +30,7 @@ public class ResequencerBatchOrderTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { from("direct:start") - .resequence(body()).batch().size(2).timeout(3000) + .resequence(body()).batch().size(2).timeout(50) .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencer.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencer.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencer.xml index 08be092..95a9d84 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencer.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencer.xml @@ -29,6 +29,7 @@ <route> <from uri="direct:start"/> <resequence> + <batch-config batchTimeout="100"/> <simple>${body}</simple> <to uri="mock:result"/> </resequence> http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml index 23310c4..8213e3d 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml @@ -32,7 +32,7 @@ <to uri="mock:error"/> </onException> <resequence> - <stream-config capacity="3" timeout="1000" rejectOld="true"/> + <stream-config capacity="3" timeout="1000" rejectOld="true" deliveryAttemptInterval="10"/> <header>seqno</header> <to uri="mock:result"/> </resequence> http://git-wip-us.apache.org/repos/asf/camel/blob/7593d136/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/streamResequencer.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/streamResequencer.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/streamResequencer.xml index 27d0ea2..aac825e 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/streamResequencer.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/streamResequencer.xml @@ -29,7 +29,7 @@ <route> <from uri="direct:start"/> <resequence> - <stream-config/> + <stream-config timeout="100" deliveryAttemptInterval="10"/> <simple>${header.seqnum}</simple> <to uri="mock:result" /> </resequence>
