Greetings, Currently i have an aggregation route with approximately following configuration: .aggregate(header("kind")) .aggregationRepository(getHdfsFlushRepo()) .optimisticLockRetryPolicy(new OptimisticLockRetryPolicy().maximumRetries(10).randomBackOff()) .optimisticLocking()
Adding new exchanges result in error: *Optimistic locking failed for exchange with key other: IMap#replace returned no Exchanges, while it's expected to replace one* Code from HazelcastAggregationRepository that is related to the issue: if (!cache.replace(key, oldHolder, newHolder)) { LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", key); throw new OptimisticLockingException(); } And i guess the source of the problem is in marshalling of exchange holder: hazelcast does compare-and-swap based on serialized binary content and not hashcode. If default exchange holder contains any hashmaps (either headers, properties or hashmap withing a body), binary representations are different, but hash codes are the same: 70 3F 40 0C 77 74 vs 70 3F 40 77 74. I'm not sure - should this problem be fixed on hazelcast component level or camel core, so I reported a bug - https://issues.apache.org/jira/browse/CAMEL-8438. There are some possible ways to get optimistic locking working with hazelcast (and probably other storages that have atomic update functionality, like InfiniSpan): - Hazelcast supports InMemory.OBJECT storage, so that ObjectRecordFactory would invoke DefaultExchangeHolder#equals method. Though, it invokes Object#equals, as current implementation in 2.14 doesn't have equals() method - think about making serialisation of exchange holder to more consistent binary form. - don't use HashMaps in aggregated exchanges at all. that means, no headers as well. Something like that, thank you for reading this. Cheers, Serge