Yeah, that makes a lot of sense. I spent some time drilling through that component's logic and while it would be hard to marry up the two processing models and their use of Synchronizations, it definitely feels like something that belongs in the same JAR.

What's the best way to submit this? Should I raise a JIRA, and issue a pull request on a Camel branch in Github?

Jakub

On 13/07/15 13:11, Claus Ibsen wrote:
Hi

And I think a good home for this is camel-sjms as its only using
standard JMS api. And we can have 2 or more components in the same JAR
- we have that in others.

And it may be able to reuse some logic to populate from JMS to Camel
Message to set headers and what different JMX message types there is.

On Tue, Jul 7, 2015 at 12:04 PM, Jakub Korab
<jakub.korab.li...@gmail.com> wrote:
Hi Claus,

I have a copy of it at https://github.com/jkorab/camel-jms-batch. Feel free
to take a look.

Thanks,

Jakub


On 01/07/15 19:26, Claus Ibsen wrote:
Hi

Ah cool. Maybe you could put it on a github repo we can use to take a peek
at?

As JMS is "big thing" and also can be complicated, we should have some
time to review and see where it fits the best in the Camel family.

On Wed, Jul 1, 2015 at 11:03 AM, Jakub Korab
<jakub.korab.li...@gmail.com> wrote:
Hi all,

I have written a consumer-only component that combines aggregation logic
with transacted JMS sessions that I would like to contribute. The
component
vastly speeds up message consumption and aggregation without message loss
on failure when compared with using a regular JMS component and
aggregator.

The problem that it solves is that when you want to aggregate a set of
messages from JMS and avoid message loss, you typically reach for a
JdbcAggregationRepository. This in turn fetches and writes progressively
larger blobs from the database on receipt of each message, slowing down
linearly in relation to to the number of messages consumed - i.e. it
performs progressively worse the larger the batch.

Old way:
from("jms:myQueue")
      .transacted()
      .aggregate(constant(true), myAggStrategy)
          .aggregationRepository(jdbcAggregationRepository)
          .completionSize(100)
          .completionTimeout(500)

This also suffers from a problem that message loss is still possible
between the message broker and the database that stores the aggregated
message (unless you use XA transactions....).

The component that I have developed starts a JMS session, and receives
messages synchronously until it meets a completion size, or until a
completion timeout is met, each time calling an AggregationStrategy. Only
when the completion conditions have been matched does it emit the
aggregated message.

The component will commit the batch transaction if the Exchange is
processed successfully, or roll the entire thing back on exception - so
all
of the original messages will end up back on the queue for re-processing.
In the event of failure of the Camel process, the messages remain on the
broker for re-dispatch.

So in terms of "where is my data stored?", the answer is it remains on
the
broker until the batch is successfully processed.

New way:

from("aggjms:myQueue?completionSize=100&completionTimeout=500&aggregationStrategy=#myAggStrategy")

The component also allows for setting the number of JMS consumers on the
endpoint, so you can scale out the number of threads that pick up
batches.

The transactional behaviour of this (and so its usage) is so different to
the regular JMS and SJMS components, that I believe it needs to be it's
own
component, as opposed to being integrated in to one of the others.

I would like to contribute this to Camel. What is the process for doing
this?

Thanks,

Jakub





Reply via email to