Hi Jakub

Looks good a few comments

The component should extend UriEndpointComponent
And the endpoint should have @UriEndpoint and other
annotations for the options. You can take look how its done elsewhere.

We use ObjectHelper.notNull for validate if a parameter is configured
or not, you can use that instead of commons-lang code.

The endpoint should be singleton as its thread-safe

And we usually prefix all those keys with Camel, eg CamelJmsBatchSize
- and dont use dots in the key.

In the consumer stop you do an await, that may in theory wait forever
if something fuckup and keep the thread running and never call the
latch. Maybe have a timeout as fallback.

The consumer should likely not have ActiveMQ dependent code like the
prefix option you set. Maybe put that out as a configuration that
people can configure. Also AMQ has a default of 1000 for prefetch.

The capped wait time at max 100 millis may causes a bit chatty IO as
that will cause each concurrent users to call 10 x sec when idle. That
may lead to a bit of IO over the network. The spring jms component
uses 1000 millis as default. Maybe put that into some option people
can configure.

You may look into using StopWatch for all that time tracking - we have
that in camel-core which may be easier to use.

Maybe consumer.close should be in a try catch and ignore so the
aggregated data can be processed?






On Tue, Jul 7, 2015 at 12:04 PM, Jakub Korab
<jakub.korab.li...@gmail.com> wrote:
> Hi Claus,
>
> I have a copy of it at https://github.com/jkorab/camel-jms-batch. Feel free
> to take a look.
>
> Thanks,
>
> Jakub
>
>
> On 01/07/15 19:26, Claus Ibsen wrote:
>>
>> Hi
>>
>> Ah cool. Maybe you could put it on a github repo we can use to take a peek
>> at?
>>
>> As JMS is "big thing" and also can be complicated, we should have some
>> time to review and see where it fits the best in the Camel family.
>>
>> On Wed, Jul 1, 2015 at 11:03 AM, Jakub Korab
>> <jakub.korab.li...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>> I have written a consumer-only component that combines aggregation logic
>>> with transacted JMS sessions that I would like to contribute. The
>>> component
>>> vastly speeds up message consumption and aggregation without message loss
>>> on failure when compared with using a regular JMS component and
>>> aggregator.
>>>
>>> The problem that it solves is that when you want to aggregate a set of
>>> messages from JMS and avoid message loss, you typically reach for a
>>> JdbcAggregationRepository. This in turn fetches and writes progressively
>>> larger blobs from the database on receipt of each message, slowing down
>>> linearly in relation to to the number of messages consumed - i.e. it
>>> performs progressively worse the larger the batch.
>>>
>>> Old way:
>>> from("jms:myQueue")
>>>      .transacted()
>>>      .aggregate(constant(true), myAggStrategy)
>>>          .aggregationRepository(jdbcAggregationRepository)
>>>          .completionSize(100)
>>>          .completionTimeout(500)
>>>
>>> This also suffers from a problem that message loss is still possible
>>> between the message broker and the database that stores the aggregated
>>> message (unless you use XA transactions....).
>>>
>>> The component that I have developed starts a JMS session, and receives
>>> messages synchronously until it meets a completion size, or until a
>>> completion timeout is met, each time calling an AggregationStrategy. Only
>>> when the completion conditions have been matched does it emit the
>>> aggregated message.
>>>
>>> The component will commit the batch transaction if the Exchange is
>>> processed successfully, or roll the entire thing back on exception - so
>>> all
>>> of the original messages will end up back on the queue for re-processing.
>>> In the event of failure of the Camel process, the messages remain on the
>>> broker for re-dispatch.
>>>
>>> So in terms of "where is my data stored?", the answer is it remains on
>>> the
>>> broker until the batch is successfully processed.
>>>
>>> New way:
>>>
>>> from("aggjms:myQueue?completionSize=100&completionTimeout=500&aggregationStrategy=#myAggStrategy")
>>>
>>> The component also allows for setting the number of JMS consumers on the
>>> endpoint, so you can scale out the number of threads that pick up
>>> batches.
>>>
>>> The transactional behaviour of this (and so its usage) is so different to
>>> the regular JMS and SJMS components, that I believe it needs to be it's
>>> own
>>> component, as opposed to being integrated in to one of the others.
>>>
>>> I would like to contribute this to Camel. What is the process for doing
>>> this?
>>>
>>> Thanks,
>>>
>>> Jakub
>>
>>
>>
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Reply via email to