[
https://issues.apache.org/activemq/browse/CAMEL-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=52103#action_52103
]
Oliver Hecker commented on CAMEL-1650:
--------------------------------------
Thanks Claus.
I was in the meantime locally working on a solution as I had drafted it above.
It would be a major change in the IdempotentRepository interface. Also the
interface
would no longer be as easy as the "Set-alike" interface of the current
IdempotentRepository.
The main idea is that the key might be first "reserved" in the repository which
also includes a timeout and might lateron either be "confirmed" or "released".
I already got a changed MemoryIdempotentRepository working and an
IdempotentConsumer implementation. For the
FileConsumer it is not that straightforward as the lockHandle Object needs to
be attached to the Exchange due to the asynchronous processor.
Probably CAMEL-1679 provides a more generic solution for this and similar
requirements.
I just want to note that your approach with the {{remove}} method might result
in loss of the message if processing the message fails after executing {{add}}
and removing the key from the repository with {{remove}} also fails. Especially
when using a DB based Repository this might happen when loosing the network
connection when processing the message.
For completeness here is the IdempotentRepository Interface as I was
experimenting with:
{code}
public interface IdempotentRepository<E> {
/**
* Reserves a given key in the repository.
* <ul>
* <li>If the key is neither reserved nor confirmed then the key will be
reserved and an Object will be returned which serves as handle for either
confirming or
* releasing the key lateron.</li>
* <li>If the key is already confirmed then <code>null</code> will be
returned immediately.</li>
* <li>If the key is reserved then the method will block until
* <ul>
* <li>the key is confirmed by the actor which reserved the key</li>
* <li>the key is released by the actor which reserved the key</li>
* <li>the reservation of the original actor timed out</li>
* </ul>
* In all cases the method will then try to reserve the key and react as
indicated above.</li>
* </ul>
* On completion of the method it is guaranteed that
* <ul>
* <li>the key was successfully reserved, indicated by a returned lock
handle object, or</li>
* <li>the key was already confirmed, indicated by returning
<code>null</code></li>
* </ul>
*
* @param key The key which should be reserved in the repository.
* @param timeOut Indicates how long the key is locked before the lock
might be broken by another actor requesting to reserve the same key. This is
* a minimum time and the lock might still be valid even after the timeout.
* @return a lock handle object in case of successful reservation;
<code>null</code> is the key was already confirmed beforehand
*/
Serializable reserveKey(E key, int timeOut);
/**
* Releases the key in the repository which is represented by the given
lock handle object.
* The key is then available again for reservation by this or another actor.
* If the lock handle object is unknown (which might also be caused by a
timed out lock or the lock already being released or confirmed)
* no action is taken.
* @param lockHandle the lock handle object which was returned when
reserving the key via the {...@link #reserveKey(Object,int)} method.
*/
void releaseKey(Serializable lockHandle);
/**
* Confirms the key in the repository which is represented by the given
lock handle object.
* @param lockHandle the lock handle object which was returned when
reserving the key via the {...@link #reserveKey(Object,int)} method.
* @throws CamelException if the lock handle object is unknown (which
might also be caused by a timed out lock or the lock already
* being released or confirmed)
*/
void confirmKey(Serializable lockHandle) throws CamelException;
}
{code}
If I don't get any other message from you I will stop working on this for now.
> Race condition in IdempotentConsumer
> ------------------------------------
>
> Key: CAMEL-1650
> URL: https://issues.apache.org/activemq/browse/CAMEL-1650
> Project: Apache Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.0-M1
> Reporter: Oliver Hecker
> Assignee: Claus Ibsen
> Fix For: 2.1.0
>
> Attachments: IdempotentConsumerTest.java
>
>
> A possible possible race condition exists in the IdempotentConsumer
> implementation:
> The code first checks in the MessageIdRepository if the message was already
> processed. If not then it processes the message and
> afterwards adds the id to the repository. (See also
> http://issues.apache.org/activemq/browse/CAMEL-1451). There is no locking
> between the check with "contains" and the insert with "add". So if multiple
> threads/instances try this in parallel for the same id, then
> it might happen that more than one finds the id not yet contained in the
> repository and the same message is processed multiple
> times.
> I enclose an extended version of IdempotentConsumerTest which illustrates the
> problem.
> It is important to note that even if the test demonstrates the issue with an
> MemoryIdempotentRepository a solution should also
> address the case of a database based respository in a clustered environment.
> So this might imply that some locking mechanism on the
> database is required.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.