Support for a stream-processing resequencer.
--------------------------------------------
Key: CAMEL-125
URL: https://issues.apache.org/activemq/browse/CAMEL-125
Project: Apache Camel
Issue Type: Improvement
Components: camel-core, camel-spring
Affects Versions: 1.1.0
Environment: Java 6, Windows XP
Reporter: Martin Krasser
Attachments: camel-core-patch.txt, camel-spring-patch.txt
Attached is a patch that adds a stream-processing resequencer to Camel. The
resequencing algorithm is based on the detection of gaps in a message stream
rather than on a fixed batch size. Gap detection in combination with timeouts
removes the constraint of having to know the number of messages of a sequence
in advance (although a capacity parameter prevents the resequencer from running
out of memory)
Route builder examples for the stream-processing resequencer:
{{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}
is equivalent to:
{{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}
Custom values for the resequencer's capacity and timeout can be set like in
this example:
{{from("direct:start").resequencer(header("seqnum")).stream(new
StreamResequencerConfig(300, 4000L)).to("mock:result")}}
The XML configuration looks like:
{code:xml}
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<from uri="direct:start"/>
<resequencer>
<simple>in.header.seqnum</simple>
<to uri="mock:result" />
<stream-config capacity="300", timeout="4000"/>
</resequencer>
</route>
</camelContext>
{code}
The existing batch-processing resequencer can be defined as usual:
{{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}
which is now equivalent to
{{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}
It is now also possible to define a custom configuration for the existing
batch-processing resequencer:
{{from("direct:start").resequencer(header("seqnum")).batch(new
BatchResequencerConfig(300, 4000L)).to("mock:result")}}
This set the batchSize to 300 and the batchTimeout to 4000 ms.
For the stream-processing resequencer to work, messages must contain a sequence
number for which a predecessor and a successor is known. For example a message
with the sequence number 3 has a predecessor message with the sequence number 2
and a successor message with the sequence number 4. The message sequence 2,3,5
has a gap because the sucessor of 3 is missing. The resequencer therefore has
to retain message 5 until message 4 arrives (or a timeout occurs).
Gap detection is done with strategies that implement the
SequenceNumberComparator<E> interface. In addition to the
java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E>
interface defines the predecessor(E, E) and successor(E, E) operations. The
stream resequencer can be configured with cutstom SequenceNumberComparator<E>
strategies.
The stream-processing resequencer uses the same algorithm as the one in
ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time
dependencies to ServiceMix I've copied the ServiceMix-independent resequencing
engine over to Camel. This redundancy should be removed once Camel and
servicemix-eip are going to be combined (are they?). I can contribute to this
task, if needed.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.