Support for a stream-processing resequencer.
--------------------------------------------

                 Key: CAMEL-125
                 URL: https://issues.apache.org/activemq/browse/CAMEL-125
             Project: Apache Camel
          Issue Type: Improvement
          Components: camel-core, camel-spring
    Affects Versions: 1.1.0
         Environment: Java 6, Windows XP
            Reporter: Martin Krasser
         Attachments: camel-core-patch.txt, camel-spring-patch.txt

Attached is a patch that adds a stream-processing resequencer to Camel. The 
resequencing algorithm is based on the detection of gaps in a message stream 
rather than on a fixed batch size. Gap detection in combination with timeouts 
removes the constraint of having to know the number of messages of a sequence 
in advance (although a capacity parameter prevents the resequencer from running 
out of memory) 

Route builder examples for the stream-processing resequencer:

{{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}

is equivalent to:

{{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}

Custom values for the resequencer's capacity and timeout can be set like in 
this example:

{{from("direct:start").resequencer(header("seqnum")).stream(new 
StreamResequencerConfig(300, 4000L)).to("mock:result")}}

The XML configuration looks like:

{code:xml}
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring";>
  <route>
    <from uri="direct:start"/>
    <resequencer>
      <simple>in.header.seqnum</simple>
      <to uri="mock:result" />
      <stream-config capacity="300", timeout="4000"/>
    </resequencer>
  </route>
</camelContext>
{code}

The existing batch-processing resequencer can be defined as usual:

{{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}

which is now equivalent to 

{{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}

It is now also possible to define a custom configuration for the existing 
batch-processing resequencer:  

{{from("direct:start").resequencer(header("seqnum")).batch(new 
BatchResequencerConfig(300, 4000L)).to("mock:result")}}

This set the batchSize to 300 and the batchTimeout to 4000 ms. 

For the stream-processing resequencer to work, messages must contain a sequence 
number for which a predecessor and a successor is known. For example a message 
with the sequence number 3 has a predecessor message with the sequence number 2 
and a successor message with the sequence number 4. The message sequence 2,3,5 
has a gap because the sucessor of 3 is missing. The resequencer therefore has 
to retain message 5 until message 4 arrives (or a timeout occurs). 

Gap detection is done with strategies that implement the 
SequenceNumberComparator<E> interface. In addition to the 
java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> 
interface defines the predecessor(E, E) and successor(E, E) operations. The 
stream resequencer can be configured with cutstom SequenceNumberComparator<E> 
strategies.

The stream-processing resequencer uses the same algorithm as the one in 
ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time 
dependencies to ServiceMix I've copied the ServiceMix-independent resequencing 
engine over to Camel. This redundancy should be removed once Camel and 
servicemix-eip are going to be combined (are they?). I can contribute to this 
task, if needed.


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to