[ 
https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13575839#comment-13575839
 ] 

Claus Ibsen commented on CAMEL-6042:
------------------------------------

Hi Aaron,

Ah good, yeah I then think your work is on the right track, and we should work 
on getting that folded into the camel-core.

                
> AggregateProcessor/AggregationRepository does not deal with optimistic 
> locking - will not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6042
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Glassfish + Gemini Blueprint + Spring 3.2
>            Reporter: Aaron Whiteside
>             Fix For: Future
>
>         Attachments: aggregate-optimistic-locking-support.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic 
> locking - and will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the 
> AggregateProcessor does not deal with optimistic locking. It uses a single 
> AggregateProcessor instance specific lock.
> In a distributed environment where there are many Camel instances on many 
> servers using a shared data store for the AggregationRepository this will not 
> work.
> Consider the following scenario using a persistent/shared 
> AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old 
> null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already 
> stored using that exchange id.
> ## at this point I could write my AggregationRepository implementation to 
> ignore the existing entry and overwrite it. But this would mean the exiting 
> exchange is lost and never aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor 
>  a1) Put the lock in the MemoryAggregationRepository or 
>  a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B 
> below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it 
> whatever you want) that is thrown when an AggregationRepository detects that 
> someone is trying to add() the same exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the 
> oldExchange (now not null) from the AggregationRepository and call the 
> AggregationStrategy again to aggregate the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed 
> environment. Given that the underlying AggregationRepository is able to 
> detect concurrent add()'s. Which most should be able to (using conditional 
> updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the 
> exchange id. When the constraint is violated JPA/JDBC/whatever will throw a 
> unique constraint violation exception which can be converted into a 
> AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a 
> OptimisticUpdateException when it detects concurrent updates. So updating 
> this component to take advantage of this feature should be very simple.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to