Re: Aggregator race condition in recovery task?

2014-12-10 Thread bendherville
The issue you're experiencing may be related to
https://issues.apache.org/jira/browse/CAMEL-4271



--
View this message in context: 
http://camel.465427.n5.nabble.com/Aggregator-race-condition-in-recovery-task-tp5758315p5760530.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Aggregator race condition in recovery task?

2014-10-29 Thread Archis.Kulkarni
Hi,

We’re using the camel 2.12.3 library for aggregation in one of our 
applications. The JdbcAggregationRepository is used to maintain the state. The 
aggregator is supposed to function based on time interval i.e. every ‘n’ ms it 
should send out the aggregated output.
We’re seeing duplicate messages being generated from the recovery task on a 
frequent basis. The recovery task is supposed to run every 5 sec.

By enabling the debug logs, and on further investigation I see the following 
sequence of events in the logs

1.[AggregateTimeoutChecker thread] - AggregateProcessor.run()  : 
Completion interval triggered for correlation key

2.   [AggregateTimeoutChecker thread] - 
JdbcAggregationRepository.doInTransactionWithoutResult()  : Removing key

3.   [AggregateRecoverChecker thread] - AggregateProcessor.run()  : 
Starting recover check

4.   [AggregateRecoverChecker thread] - JdbcAggregationRepository.mapRow()  
: getKey

5.   [AggregateTimeoutChecker thread] - 
AggregateProcessor.onSubmitCompletion() : Aggregation complete for correlation 
key

6.   [AggregateRecoverChecker thread] - AggregateProcessor.run() : Loading 
aggregated exchange with id

7.   [AggregateTimeoutChecker thread] - AggregateProcessor.run() : 
Processing aggregated exchange

8.   [AggregateRecoverChecker thread] - JdbcAggregationRepository.recover() 
- Recovering exchangeId

9.   [AggregateRecoverChecker thread] - AggregateProcessor.run() - Delivery 
attempt: 1 to recover aggregated exchange with id

10.   [AggregateRecoverChecker thread] - 
AggregateProcessor.onSubmitCompletion() - Aggregation complete for correlation 
key


In short, in the timeout checker, it removes the exchange from XXX table and 
adds it to XXX_completed table. Then a little later in the processing, it adds 
the exchange to the “inProgressCompleteExchanges” structure.
In the recover task, it scans the XXX_completed table and then checks if they 
are present in “inProgressCompleteExchanges” before attempting to recover.

This usage of “inProgressCompleteExchanges” looks dodgy to me as I don’t see 
any explicit locking for it in the recover task while the timeout task may be 
updating it simultaneously.
This is how the set is created in AggregateProcessor.
private final SetString inProgressCompleteExchanges = 
Collections.newSetFromMap(new ConcurrentHashMapString, Boolean());

How does it ensure synchronization in this case?
Is there something I’m missing in this analysis? I don’t understand how add and 
contains methods on this set called from different threads will be synchronized 
in this case? Any help in this regards will be much appreciated.

Thanks,
Archis



PLEASE READ: This message is for the named person's use only. It may contain 
confidential, proprietary or legally privileged information. No confidentiality 
or privilege is waived or lost by any mistransmission. If you receive this 
message in error, please delete it and all copies from your system, destroy any 
hard copies and notify the sender. You must not, directly or indirectly, use, 
disclose, distribute, print, or copy any part of this message if you are not 
the intended recipient. Nomura Holding America Inc., Nomura Securities 
International, Inc, and their respective subsidiaries each reserve the right to 
monitor all e-mail communications through its networks. Any views expressed in 
this message are those of the individual sender, except where the message 
states otherwise and the sender is authorized to state the views of such 
entity. Unless otherwise stated, any pricing information in this message is 
indicative only, is subject to change and does not constitute an offer to deal 
at any price quoted. Any reference to the terms of executed transactions should 
be treated as preliminary only and subject to our formal written confirmation.