This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 913c6ff  CAMEL-16861: Cleanup and update EIP docs
913c6ff is described below

commit 913c6ffa3c95c75cb52f1920c52594c564578d44
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Oct 12 14:09:11 2021 +0200

    CAMEL-16861: Cleanup and update EIP docs
---
 .../docs/modules/eips/pages/resequence-eip.adoc    | 312 +++++++++++++++------
 .../docs/modules/eips/pages/stream-config-eip.adoc |   2 +-
 .../apache/camel/model/config/stream-config.json   |   8 +-
 .../model/config/StreamResequencerConfig.java      |  15 +-
 .../org/apache/camel/processor/Resequencer.java    |   1 -
 .../apache/camel/processor/StreamResequencer.java  |   8 +-
 .../processor/resequencer/ResequencerEngine.java   |   2 +-
 .../resequencer/SequenceElementComparator.java     |   4 +-
 .../processor/resequencer/SequenceSender.java      |   1 -
 .../processor/resequencer/IntegerComparator.java   |   4 +-
 10 files changed, 248 insertions(+), 109 deletions(-)

diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resequence-eip.adoc 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resequence-eip.adoc
index 75da679..f57e681 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resequence-eip.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resequence-eip.adoc
@@ -5,26 +5,44 @@
 :since: 
 :supportlevel: Stable
 
-The http://www.enterpriseintegrationpatterns.com/Resequencer.html[Resequencer] 
from the xref:components:eips:enterprise-integration-patterns.adoc[EIP 
patterns] allows you to reorganise messages based on some comparator. +
-By default in Camel we use an Expression to create the comparator; so that you 
can compare by a message header or the body or a piece of a message etc.
+Camel supports the
+http://www.enterpriseintegrationpatterns.com/Resequencer.html[Resequencer]
+from the xref:enterprise-integration-patterns.adoc[EIP patterns].
+
+How can we get a stream of related but out-of-sequence messages back into the 
correct order?
 
 image::eip/Resequencer.gif[image]
 
+Use a stateful filter, a Resequencer, to collect and re-order messages so that 
they can be published to the output channel in a specified order.
+
+The Resequencer implementation in Camel uses an 
xref:latest@manual:ROOT:expression.adoc[Expression]
+as the `Comparator` to re-order the messages. By using the expression then the 
messages
+can easily be re-ordered by a message header or another piece of the message.
+
 == Options
 
 // eip options: START
 include::partial$eip-options.adoc[]
 // eip options: END
 
-Camel supports two resequencing algorithms:
+Camel supports two re-sequencing algorithms:
 
-* *Batch resequencing* collects messages into a batch, sorts the messages and 
sends them to their output.
-* *Stream resequencing* re-orders (continuous) message streams based on the 
detection of gaps between messages.
+* xref:batch-config-eip.adoc[Batch Resequencing] - *Default mode*: collects 
messages into a batch, sorts the messages and sends them to their output.
+* xref:stream-config-eip.adoc[Stream Resequencing] - re-orders (continuous) 
message streams based on the detection of gaps between messages.
 
-By default the Resequencer does not support duplicate messages and will only 
keep the last message, in case a message arrives with the same message 
expression. However in the batch mode you can enable it to allow duplicates.
+By default, the Resequencer does not support duplicate messages and will only 
keep the last message, in case a message arrives with the same message 
expression.
+However, in the batch mode you can enable it to allow duplicates.
 
 == Batch Resequencing
-The following example shows how to use the batch-processing resequencer so 
that messages are sorted in order of the *body()* expression. That is messages 
are collected into a batch (either by a maximum number of messages per batch or 
using a timeout) then they are sorted in order and then sent out to their 
output.
+
+The following example shows how to use the Resequencer in 
xref:batch-config-eip.adoc[batch mode] (default),
+so that messages are sorted in order of the message body.
+
+That is messages are collected into a batch (either by a maximum number of 
messages per batch or using a timeout)
+then they are sorted, and sent out, to continue being routed.
+
+In the example below we re-order the message based on content of the message 
body.
+The default batch modes will collect up till 100 message per batch, or timeout 
every second.
 
 [source,java]
 ----
@@ -33,7 +51,8 @@ from("direct:start")
     .to("mock:result");
 ----
 
-This is equivalent to
+This is equivalent to:
+
 [source,java]
 ----
 from("direct:start")
@@ -41,7 +60,21 @@ from("direct:start")
     .to("mock:result");
 ----
 
-The batch-processing resequencer can be further configured via the `size()` 
and `timeout()` methods.
+And in XML:
+
+[source,xml]
+----
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <simple>${body}</simple>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
+----
+
+The batch resequencer can be further configured via the `size()` and 
`timeout()` methods:
+
 [source,java]
 ----
 from("direct:start")
@@ -49,129 +82,234 @@ from("direct:start")
     .to("mock:result")
 ----
 
-This sets the batch size to 300 and the batch timeout to 4000 ms (by default, 
the batch size is 100 and the timeout is 1000 ms). Alternatively, you can 
provide a configuration object.
+This sets the batch size to 300 and the batch timeout to 4000 ms (by default, 
the batch size is 100 and the timeout is 1000 ms).
+
+And the example in XML DSL:
+
+[source,xml]
+----
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <batch-config batchSize="300" batchTimeout="4000"/>
+        <simple>${body}</simple>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
+----
+
+So the above example will reorder messages  in order of their bodies.
+Typically, you'd use a header rather than the body to order things; or maybe a 
part of the body.
+So you could replace this expression with:
 
 [source,java]
 ----
 from("direct:start")
-    .resequence(body()).batch(new BatchResequencerConfig(300, 4000L))
+    .resequence(header("mySeqNo"))
     .to("mock:result")
 ----
 
-So the above example will reorder messages from endpoint *direct:a* in order 
of their bodies, to the endpoint *mock:result*. +
-Typically you'd use a header rather than the body to order things; or maybe a 
part of the body. So you could replace this expression with
+And in XML:
 
-[source,java]
+[source,xml]
 ----
-resequence(header("mySeqNo"))
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <header>mySeqNo</header>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
 ----
 
-for example to reorder messages using a custom sequence number in the header 
`mySeqNo`. +
-You can of course use many different Expression languages such as XPath, 
XQuery, SQL or various Scripting Languages.
+This reorders messages using a custom sequence number with the header name 
mySeqNo.
+
+=== Allow Duplicates
+
+When allowing duplicates then the resequencer retain duplicate message instead
+of keeping only last duplicated message.
+
+In batch mode, you can turn on duplicates as follows:
+
+[source,java]
+----
+from("direct:start")
+    .resequence(header("mySeqNo")).allowDuplicates()
+    .to("mock:result")
+----
 
-And an example in XML
+And in XML:
 
 [source,xml]
 ----
-<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
-  <route>
-    <from uri="direct:start" />
+<route>
+    <from uri="direct:start"/>
     <resequence>
-      <simple>body</simple>
-      <to uri="mock:result" />
-      <!--
-        batch-config can be ommitted for default (batch) resequencer settings
-      -->
-      <batch-config batchSize="300" batchTimeout="4000" />
+        <batch-config allowDuplicates="true"/>
+        <header>mySeqNo</header>
+        <to uri="mock:result"/>
     </resequence>
-  </route>
-</camelContext>
+</route>
 ----
 
-== Allow Duplicates
-In the `batch` mode, you can now allow duplicates. In Java DSL there is a 
`allowDuplicates()` method and in Spring XML there is an `allowDuplicates=true` 
attribute on the `<batch-config/>` you can use to enable it.
+=== Reverse Ordering
 
-== Reverse
-In the `batch` mode, you can now reverse the expression ordering. By default 
the order is based on 0..9,A..Z, which would let messages with low numbers be 
ordered first, and thus also outgoing first. In some cases you want to reverse 
order, which is now possible. +
-In Java DSL there is a `reverse()` method and in Spring XML there is an 
`reverse=true` attribute on the `<batch-config/>` you can use to enable it.
+You can reverse the expression ordering.
+By default, the order is based on 0..9,A..Z, which would let messages with low 
numbers be ordered first, and thus also outgoing first.
+In some cases you want to reverse the ordering.
 
-== Resequence JMS messages based on JMSPriority
-It's now much easier to use the Resequencer to resequence messages from JMS 
queues based on JMSPriority. For that to work you need to use the two new 
options `allowDuplicates` and `reverse`.
+In batch mode, you can turn on reverse as follows:
 
 [source,java]
 ----
-from("jms:queue:foo")
-    // sort by JMSPriority by allowing duplicates (message can have same 
JMSPriority)
-    // and use reverse ordering so 9 is first output (most important), and 0 
is last
-    // use batch mode and fire every 3th second
-    
.resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse()
-    .to("mock:result");
+from("direct:start")
+    .resequence(header("mySeqNo")).reverse()
+    .to("mock:result")
 ----
 
-Notice this is *only* possible in the `batch` mode of the Resequencer.
+And in XML:
 
-== Ignore invalid exchanges
+[source,xml]
+----
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <batch-config reverse="true"/>
+        <header>mySeqNo</header>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
+----
 
-The Resequencer EIP throws a `CamelExchangeException` if the incoming Exchange 
is not valid for the resequencer - ie. the expression cannot be evaluated, such 
as a missing header.
-You can use the option `ignoreInvalidExchanges` to ignore these exceptions 
which means the Resequencer will then skip the invalid Exchange.
+=== Ignoring invalid messages
+
+The Resequencer throws a `CamelExchangeException` if the incoming Exchange is 
not valid for the resequencer
+such as, the expression cannot be evaluated due to a missing header.
+
+You can ignore these kinds of errors, and let the Resequencer skip the invalid 
Exchange.
+
+To do this you do as follows in Java DSL:
 
 [source,java]
 ----
 from("direct:start")
-    .resequence(header("seqno")).batch().timeout(1000)
+    .resequence(header("seqno")).batch()
         // ignore invalid exchanges (they are discarded)
         .ignoreInvalidExchanges()
     .to("mock:result");
 ----
 
-This option is available for both batch and stream resequencer.
+And in XML:
+
+[source,xml]
+----
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <batch-config ignoreInvalidExchanges="true"/>
+        <header>seqno</header>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
+----
+
+This option is available for both batch and stream mode.
 
-== Reject Old Exchanges
+=== Resequence JMS messages based on JMSPriority
 
-This option can be used to prevent out of order messages from being sent 
regardless of the event that delivered messages downstream (capacity, timeout, 
etc). If enabled using `rejectOld()`, the Resequencer will throw a 
`MessageRejectedException` when an incoming Exchange is "older" (based on the 
Comparator) than the last delivered message. This provides an extra level of 
control with regards to delayed message ordering.
+It's now much easier to use the Resequencer to resequence messages from JMS 
queues based on JMSPriority. For that to work you need to use the two new 
options `allowDuplicates` and `reverse`.
 
 [source,java]
 ----
-from("direct:start")
-    
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
-    .resequence(header("seqno")).stream().timeout(1000).rejectOld()
+from("jms:queue:foo")
+    // sort by JMSPriority by allowing duplicates (message can have same 
JMSPriority)
+    // and use reverse ordering so 9 is first output (most important), and 0 
is last
+    // use batch mode and fire every 3th second
+    
.resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse()
     .to("mock:result");
 ----
 
-This option is available for the stream resequencer only.
+Notice this is *only* possible in the `batch` mode of the Resequencer.
+
 
 == Stream Resequencing
-The next example shows how to use the stream-processing resequencer. Messages 
are re-ordered based on their sequence numbers given by a seqnum header using 
gap detection and timeouts on the level of individual messages.
+
+In streaming mode then the Resequencer will send out messages as soon as 
possible when
+a message with the next exepcted sequence number arrived.
+
+The streaming mode requires the messages to be re-ordered based on a intergral 
numeric values
+that are ordered 1,2,3...N.
+
+The following example uses the header seqnum for the ordering:
 
 [source,java]
 ----
-from("direct:start").resequence(header("seqnum")).stream().to("mock:result");
+from("direct:start")
+  .resequence(header("seqnum")).stream()
+  .to("mock:result");
 ----
 
-The stream-processing resequencer can be further configured via the 
`capacity()` and `timeout()` methods.
+And in XML:
 
-[source,java]
+[source,xml]
 ----
-from("direct:start")
-    .resequence(header("seqnum")).stream().capacity(5000).timeout(4000L)
-    .to("mock:result")
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <stream-config/>
+        <header>seqno</header>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
 ----
 
-This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by 
default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you 
can provide a configuration object.
+The Resequencer keeps a backlog of pending messages in a backlog.
+The default capacity is 1000 elements, which can be configured:
 
 [source,java]
 ----
 from("direct:start")
-    .resequence(header("seqnum")).stream(new StreamResequencerConfig(5000, 
4000L))
+    .resequence(header("seqnum")).stream().capacity(5000).timeout(4000)
     .to("mock:result")
 ----
 
+And in XML:
+
+[source,xml]
+----
+<route>
+    <from uri="direct:start"/>
+    <resequence>
+        <stream-config capacity="5000" timeout="4000"/>
+        <header>seqno</header>
+        <to uri="mock:result"/>
+    </resequence>
+</route>
+----
+
+This uses a capacity of 5000 elements. And the timeout has been set to 4 
seconds.
+In case of a timeout, then the resequencer disreards the current exepected 
sequence number,
+and moves to the next expected number.
+
+=== How streaming mode works
+
 The stream-processing resequencer algorithm is based on the detection of gaps 
in a message stream rather than on a fixed batch size.
-Gap detection in combination with timeouts removes the constraint of having to 
know the number of messages of a sequence (i.e. the batch size) in advance. 
Messages must contain a unique sequence number for which a predecessor and a 
successor is known. For example a message with the sequence number 3 has a 
predecessor message with the sequence number 2 and a successor message with the 
sequence number 4. The message sequence 2,3,5 has a gap because the successor 
of 3 is missing. The resequ [...]
+Gap detection in combination with timeouts removes the constraint of having to 
know the number of messages of a sequence (i.e. the batch size) in advance.
+Messages must contain a unique sequence number for which a predecessor and a 
successor is known.
 
-If the maximum time difference between messages (with successor/predecessor 
relationship with respect to the sequence number) in a message stream is known, 
then the resequencer's timeout parameter should be set to this value. In this 
case it is guaranteed that all messages of a stream are delivered in correct 
order to the next processor. The lower the timeout value is compared to the 
out-of-sequence time difference the higher is the probability for 
out-of-sequence messages delivered by t [...]
+For example a message with the sequence number 3 has a predecessor message 
with the sequence number 2 and a successor message with the sequence number 4.
+The message sequence 2,3,5 has a gap because the successor of 3 is missing. 
The resequencer therefore has to retain message 5 until message 4 arrives (or a 
timeout occurs).
 
-By default, the stream resequencer expects long sequence numbers but other 
sequence numbers types can be supported as well by providing a custom 
expression.
+If the maximum time difference between messages (with successor/predecessor 
relationship with respect to the sequence number)
+in a message stream is known, then the Resequencer timeout parameter should be 
set to this value.
+
+In this case it is guaranteed that all messages of a stream are delivered in 
correct order to the next processor.
+The lower the timeout value is compared to the out-of-sequence time difference 
the higher is the probability for out-of-sequence messages delivered by this 
Resequencer.
+Large timeout values should be supported by sufficiently high capacity values. 
The capacity parameter is used to prevent the Resequencer from running out of 
memory.
+
+=== Using custom streaming mode sequence expression
+
+By default, the stream Resequencer expects long sequence numbers but other 
sequence numbers types can be supported as well by providing a custom 
expression.
 
 [source,java]
 ----
@@ -193,47 +331,49 @@ public class MyFileNameExpression implements Expression {
         Object result = evaluate(exchange);
         return exchange.getContext().getTypeConverter().convertTo(type, 
result);
     }
-
 }
-
-from("direct:start")
-    .resequence(new 
MyFileNameExpression()).stream().timeout(100).to("mock:result");
 ----
 
-or custom comparator via the `comparator()` method
+And then you can use this expression in a Camel route:
 
 [source,java]
 ----
-ExpressionResultComparator<Exchange> comparator = new MyComparator();
 from("direct:start")
-    .resequence(header("seqnum")).stream().comparator(comparator)
+    .resequence(new MyFileNameExpression()).stream().timeout(2000)
     .to("mock:result");
 ----
 
-or via a `StreamResequencerConfig` object.
+=== Rejecting old messages
+
+Rejecting old messages is used to prevent out of order messages from being 
sent,
+regardless of the event that delivered messages downstream (capacity, timeout, 
etc).
+
+If enabled, the Resequencer will throw a `MessageRejectedException`
+when an incoming Exchange is _older_ (based on the `Comparator`) than the last 
delivered message.
+
+This provides an extra level of control in regard to delayed message ordering.
+
+In the example below old messages are rejected:
 
 [source,java]
 ----
-ExpressionResultComparator<Exchange> comparator = new MyComparator();
-StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, 
comparator);
-
 from("direct:start")
-    .resequence(header("seqnum")).stream(config)
+    .resequence(header("seqno")).stream().timeout(1000).rejectOld()
     .to("mock:result");
 ----
 
-And an example in XML
+And in XML DSL:
 
 [source,xml]
 ----
-<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
-  <route>
+<route>
     <from uri="direct:start"/>
     <resequence>
-      <simple>in.header.seqnum</simple>
-      <to uri="mock:result" />
-      <stream-config capacity="5000" timeout="4000"/>
+        <stream-config rejectOld="true" timeout="1000"/>
+        <header>seqno</header>
+        <to uri="mock:result"/>
     </resequence>
-  </route>
-</camelContext>
+</route>
 ----
+
+If an old message is detected then Camel throws `MessageRejectedException`.
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/stream-config-eip.adoc
 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/stream-config-eip.adoc
index 9220f1b..7ae6a9c 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/stream-config-eip.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/stream-config-eip.adoc
@@ -5,7 +5,7 @@
 :since: 
 :supportlevel: Stable
 
-Stream-processing resequence EIP
+Configuring for xref:resequence-eip.adoc[Resequence EIP] in stream mode.
 
 // eip options: START
 include::partial$eip-options.adoc[]
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/config/stream-config.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/config/stream-config.json
index bfafe09..7204003 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/config/stream-config.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/config/stream-config.json
@@ -11,11 +11,11 @@
     "output": false
   },
   "properties": {
-    "capacity": { "kind": "attribute", "displayName": "Capacity", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "100", "description": 
"Sets the capacity of the resequencer's inbound queue." },
-    "timeout": { "kind": "attribute", "displayName": "Timeout", "required": 
false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1s", "description": "Sets 
minimum time to wait for missing elements (messages)." },
-    "deliveryAttemptInterval": { "kind": "attribute", "displayName": "Delivery 
Attempt Interval", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1s", "description": "Sets the interval in milli seconds the 
stream resequencer will at most wait while waiting for condition of being able 
to deliver." },
+    "capacity": { "kind": "attribute", "displayName": "Capacity", "required": 
false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1000", "description": 
"Sets the capacity of the resequencer inbound queue." },
+    "timeout": { "kind": "attribute", "displayName": "Timeout", "required": 
false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "1s", "description": "Sets 
minimum time (milliseconds) to wait for missing elements (messages)." },
+    "deliveryAttemptInterval": { "kind": "attribute", "displayName": "Delivery 
Attempt Interval", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1s", "description": "Sets the interval in milliseconds the 
stream resequencer will at most wait while waiting for condition of being able 
to deliver." },
     "ignoreInvalidExchanges": { "kind": "attribute", "displayName": "Ignore 
Invalid Exchanges", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether to ignore invalid exchanges" },
-    "comparatorRef": { "kind": "attribute", "displayName": "Comparator Ref", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
use a custom comparator" },
+    "comparatorRef": { "kind": "attribute", "displayName": "Comparator Ref", 
"required": false, "type": "string", "javaType": "java.lang.String", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
use a custom comparator as a 
org.apache.camel.processor.resequencer.ExpressionResultComparator type." },
     "rejectOld": { "kind": "attribute", "displayName": "Reject Old", 
"required": false, "type": "boolean", "javaType": "java.lang.Boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "If true, throws an exception when messages older than 
the last delivered message are processed" }
   }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
index de2d682..1e95ef5 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
@@ -33,7 +33,7 @@ import org.apache.camel.spi.Metadata;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class StreamResequencerConfig extends ResequencerConfig {
     @XmlAttribute
-    @Metadata(defaultValue = "100", javaType = "java.lang.Integer")
+    @Metadata(defaultValue = "1000", javaType = "java.lang.Integer")
     private String capacity;
     @XmlAttribute
     @Metadata(defaultValue = "1s", javaType = "java.time.Duration")
@@ -47,6 +47,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
     @XmlTransient
     private ExpressionResultComparator comparator;
     @XmlAttribute
+    @Metadata(label = "advanced")
     private String comparatorRef;
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
@@ -78,7 +79,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
      * 
      * @param capacity   capacity of the resequencer's inbound queue.
      * @param timeout    minimum time to wait for missing elements (messages).
-     * @param comparator comparator for sequence comparision
+     * @param comparator comparator for sequence comparison
      */
     public StreamResequencerConfig(int capacity, long timeout, 
ExpressionResultComparator comparator) {
         this(capacity, timeout, null, comparator);
@@ -103,7 +104,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
      * @param capacity   capacity of the resequencer's inbound queue.
      * @param timeout    minimum time to wait for missing elements (messages).
      * @param rejectOld  if true, throws an exception when messages older than 
the last delivered message are processed
-     * @param comparator comparator for sequence comparision
+     * @param comparator comparator for sequence comparison
      */
     public StreamResequencerConfig(int capacity, long timeout, Boolean 
rejectOld, ExpressionResultComparator comparator) {
         this.capacity = Integer.toString(capacity);
@@ -128,7 +129,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
     }
 
     /**
-     * Sets the capacity of the resequencer's inbound queue.
+     * Sets the capacity of the resequencer inbound queue.
      */
     public void setCapacity(String capacity) {
         this.capacity = capacity;
@@ -139,7 +140,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
     }
 
     /**
-     * Sets minimum time to wait for missing elements (messages).
+     * Sets minimum time (milliseconds) to wait for missing elements 
(messages).
      */
     public void setTimeout(String timeout) {
         this.timeout = timeout;
@@ -150,7 +151,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
     }
 
     /**
-     * Sets the interval in milli seconds the stream resequencer will at most 
wait while waiting for condition of being
+     * Sets the interval in milliseconds the stream resequencer will at most 
wait while waiting for condition of being
      * able to deliver.
      */
     public void setDeliveryAttemptInterval(String deliveryAttemptInterval) {
@@ -184,7 +185,7 @@ public class StreamResequencerConfig extends 
ResequencerConfig {
     }
 
     /**
-     * To use a custom comparator
+     * To use a custom comparator as a 
org.apache.camel.processor.resequencer.ExpressionResultComparator type.
      */
     public void setComparatorRef(String comparatorRef) {
         this.comparatorRef = comparatorRef;
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index 6e0282b..c1ad5e9 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -466,7 +466,6 @@ public class Resequencer extends AsyncProcessorSupport 
implements Navigate<Proce
                             boolean drained = false;
                             while (isInBatchCompleted(queue.size())) {
                                 drained = true;
-                                // TODO id in this branch is 100% null
                                 drainQueueTo(collection, batchSize, id);
                             }
                             if (drained) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index d80f31e..a7a274a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -243,7 +243,7 @@ public class StreamResequencer extends AsyncProcessorSupport
             try {
                 Thread.sleep(getTimeout());
             } catch (InterruptedException e) {
-                // we was interrupted so break out
+                // we were interrupted so break out
                 exchange.setException(e);
                 callback.done(true);
                 return true;
@@ -285,8 +285,8 @@ public class StreamResequencer extends AsyncProcessorSupport
 
     class Delivery extends Thread {
 
-        private Lock deliveryRequestLock = new ReentrantLock();
-        private Condition deliveryRequestCondition = 
deliveryRequestLock.newCondition();
+        private final Lock deliveryRequestLock = new ReentrantLock();
+        private final Condition deliveryRequestCondition = 
deliveryRequestLock.newCondition();
 
         Delivery() {
             
super(camelContext.getExecutorServiceManager().resolveThreadName("Resequencer 
Delivery"));
@@ -308,7 +308,7 @@ public class StreamResequencer extends AsyncProcessorSupport
                 try {
                     engine.deliver();
                 } catch (Throwable t) {
-                    // a fail safe to handle all exceptions being thrown
+                    // a fail-safe to handle all exceptions being thrown
                     getExceptionHandler().handleException(t);
                 }
             }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
index 7ebf435..e057858 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
@@ -247,7 +247,7 @@ public class ResequencerEngine<E> {
         if (sequence.isEmpty()) {
             return false;
         }
-        // inspect element with lowest sequence value
+        // inspect element with the lowest sequence value
         Element<E> element = sequence.first();
 
         // if element is scheduled do not deliver and return
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
index f01e376..8de7b75 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
@@ -37,7 +37,7 @@ public interface SequenceElementComparator<E> extends 
Comparator<E> {
      * 
      * @param  o1 a sequence element.
      * @param  o2 a sequence element.
-     * @return    true if its an immediate successor
+     * @return    true if it's an immediate successor
      */
     boolean successor(E o1, E o2);
 
@@ -45,7 +45,7 @@ public interface SequenceElementComparator<E> extends 
Comparator<E> {
      * Returns <tt>true</tt> if the <code>o1</code> can be used in this 
comparator.
      *
      * @param  o1 a sequence element
-     * @return    true if its usable for this comparator
+     * @return    true if it's usable for this comparator
      */
     boolean isValid(E o1);
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
index ebd2251..070715f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
@@ -19,7 +19,6 @@ package org.apache.camel.processor.resequencer;
 /**
  * An interface used by the {@link ResequencerEngine#deliver()} and {@link 
ResequencerEngine#deliverNext()} methods to
  * send out re-ordered elements.
- *
  */
 public interface SequenceSender<E> {
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
index db80097..abeefd3 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
@@ -20,12 +20,12 @@ public class IntegerComparator implements 
SequenceElementComparator<Integer> {
 
     @Override
     public boolean predecessor(Integer o1, Integer o2) {
-        return o1.intValue() == (o2.intValue() - 1);
+        return o1 == (o2 - 1);
     }
 
     @Override
     public boolean successor(Integer o1, Integer o2) {
-        return o2.intValue() == (o1.intValue() - 1);
+        return o2 == (o1 - 1);
     }
 
     @Override

Reply via email to