Hi,
We’re using the camel 2.12.3 library for aggregation in one of our
applications. The JdbcAggregationRepository is used to maintain the state. The
aggregator is supposed to function based on time interval i.e. every ‘n’ ms it
should send out the aggregated output.
We’re seeing duplicate messages being generated from the recovery task on a
frequent basis. The recovery task is supposed to run every 5 sec.
By enabling the debug logs, and on further investigation I see the following
sequence of events in the logs
1.[AggregateTimeoutChecker thread] - AggregateProcessor.run() :
Completion interval triggered for correlation key
2. [AggregateTimeoutChecker thread] -
JdbcAggregationRepository.doInTransactionWithoutResult() : Removing key
3. [AggregateRecoverChecker thread] - AggregateProcessor.run() :
Starting recover check
4. [AggregateRecoverChecker thread] - JdbcAggregationRepository.mapRow()
: getKey
5. [AggregateTimeoutChecker thread] -
AggregateProcessor.onSubmitCompletion() : Aggregation complete for correlation
key
6. [AggregateRecoverChecker thread] - AggregateProcessor.run() : Loading
aggregated exchange with id
7. [AggregateTimeoutChecker thread] - AggregateProcessor.run() :
Processing aggregated exchange
8. [AggregateRecoverChecker thread] - JdbcAggregationRepository.recover()
- Recovering exchangeId
9. [AggregateRecoverChecker thread] - AggregateProcessor.run() - Delivery
attempt: 1 to recover aggregated exchange with id
10. [AggregateRecoverChecker thread] -
AggregateProcessor.onSubmitCompletion() - Aggregation complete for correlation
key
In short, in the timeout checker, it removes the exchange from XXX table and
adds it to XXX_completed table. Then a little later in the processing, it adds
the exchange to the “inProgressCompleteExchanges” structure.
In the recover task, it scans the XXX_completed table and then checks if they
are present in “inProgressCompleteExchanges” before attempting to recover.
This usage of “inProgressCompleteExchanges” looks dodgy to me as I don’t see
any explicit locking for it in the recover task while the timeout task may be
updating it simultaneously.
This is how the set is created in AggregateProcessor.
private final SetString inProgressCompleteExchanges =
Collections.newSetFromMap(new ConcurrentHashMapString, Boolean());
How does it ensure synchronization in this case?
Is there something I’m missing in this analysis? I don’t understand how add and
contains methods on this set called from different threads will be synchronized
in this case? Any help in this regards will be much appreciated.
Thanks,
Archis
PLEASE READ: This message is for the named person's use only. It may contain
confidential, proprietary or legally privileged information. No confidentiality
or privilege is waived or lost by any mistransmission. If you receive this
message in error, please delete it and all copies from your system, destroy any
hard copies and notify the sender. You must not, directly or indirectly, use,
disclose, distribute, print, or copy any part of this message if you are not
the intended recipient. Nomura Holding America Inc., Nomura Securities
International, Inc, and their respective subsidiaries each reserve the right to
monitor all e-mail communications through its networks. Any views expressed in
this message are those of the individual sender, except where the message
states otherwise and the sender is authorized to state the views of such
entity. Unless otherwise stated, any pricing information in this message is
indicative only, is subject to change and does not constitute an offer to deal
at any price quoted. Any reference to the terms of executed transactions should
be treated as preliminary only and subject to our formal written confirmation.