[ https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13575839#comment-13575839 ]
Claus Ibsen commented on CAMEL-6042: ------------------------------------ Hi Aaron, Ah good, yeah I then think your work is on the right track, and we should work on getting that folded into the camel-core. > AggregateProcessor/AggregationRepository does not deal with optimistic > locking - will not work correctly in a distributed environment > ------------------------------------------------------------------------------------------------------------------------------------- > > Key: CAMEL-6042 > URL: https://issues.apache.org/jira/browse/CAMEL-6042 > Project: Camel > Issue Type: Improvement > Components: camel-core > Affects Versions: 2.10.3 > Environment: Glassfish + Gemini Blueprint + Spring 3.2 > Reporter: Aaron Whiteside > Fix For: Future > > Attachments: aggregate-optimistic-locking-support.patch > > > AggregateProcessor/AggregationRepository does not deal with optimistic > locking - and will not work correctly in a distributed environment. > I started to write a Voldemort specific AggregationRepository I saw that the > AggregateProcessor does not deal with optimistic locking. It uses a single > AggregateProcessor instance specific lock. > In a distributed environment where there are many Camel instances on many > servers using a shared data store for the AggregationRepository this will not > work. > Consider the following scenario using a persistent/shared > AggregationRepository: > Camel instance A on server A, receives Exchange 1.. > # AggregateProcessor first acquires instance specific lock. > # AggregateProcessor calls oldExchange = AggregationRepository.get() > # oldExchange is null > Camel instance B on server B, receives Exchange 2.. > # AggregateProcessor first acquires instance specific lock. > # AggregateProcessor calls oldExchange = AggregationRepository.get() > # oldExchange is null > Camel instance A & B at the same time both call.. > # AggregateProcessor calls AggregationStrategy with the new exchange and old > null exchange > # aggregationRepository.add() with the result (the new exchange) > # Camel instance A succeeds to store the new exchange. > # Camel instance B fails with an exception stating that something is already > stored using that exchange id. > ## at this point I could write my AggregationRepository implementation to > ignore the existing entry and overwrite it. But this would mean the exiting > exchange is lost and never aggregated. > A possible solution would be: > a) Remove the lock from AggregateProcessor > a1) Put the lock in the MemoryAggregationRepository or > a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B > below). > b) Introduce an AggregationRepositoryOptimisticLockException (name it > whatever you want) that is thrown when an AggregationRepository detects that > someone is trying to add() the same exchange id at the same time. > Upon receiving this exception the AggregateProcessor would re-get() the > oldExchange (now not null) from the AggregationRepository and call the > AggregationStrategy again to aggregate the old and the new exchanges. > This would ensure that no exchanges fail to aggregate in a distributed > environment. Given that the underlying AggregationRepository is able to > detect concurrent add()'s. Which most should be able to (using conditional > updates). > For example: > SQL could try and insert into a table with a unique constraint on the > exchange id. When the constraint is violated JPA/JDBC/whatever will throw a > unique constraint violation exception which can be converted into a > AggregationRepositoryOptimisticLockException. > And HawtDB supports optimistic locking out of the box, by throwing a > OptimisticUpdateException when it detects concurrent updates. So updating > this component to take advantage of this feature should be very simple. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira