Hi all,
i stumbled across a maybe critical concurrency bug in the
DefaultProducerCache of camel 4.x under high (concurrent) load related
to the change from this ticket
https://issues.apache.org/jira/browse/CAMEL-19058 and this commit
https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab
We encountered strange misrouted exchanges (and therefor errors) after
upgrading to camel 4.0 under high load scenarios - wrong producer
templates were picked from the cache due to the lack of
synchronization
I have created a small unit test which show the problem (i placed it
in the "DefaultProducerCacheTest" class - just a proof of concept, not
cleaned up at all :D)
---------------
@Test
public void testAcquireConcurrencyIssues() throws
InterruptedException, ExecutionException {
DefaultProducerCache cache = new DefaultProducerCache(this, context, 0);
cache.start();
List<Endpoint> endpoints = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Endpoint e = context.getEndpoint("direct:queue:" + i);
AsyncProducer p = cache.acquireProducer(e);
endpoints.add(e);
}
assertEquals(3, cache.size());
ExecutorService ex = Executors.newFixedThreadPool(16);
List<Callable<Boolean>> callables = new ArrayList<>();
for(int i = 0; i < 500; i++) {
int index = i % 3;
callables.add(() -> {
Producer producer = cache.acquireProducer(endpoints.get(index));
boolean isEqual =
producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());
if(!isEqual) {
System.out.println("Endpoint uri to acquire: " +
endpoints.get(index).getEndpointUri() + ", returned producer (uri): "
+ producer.getEndpoint().getEndpointUri());
}
return isEqual;
});
}
for (int i = 0; i < 100; i++) {
System.out.println("Iteration: " + (i + 1));
List<Future<Boolean>> results = ex.invokeAll(callables);
for (Future<Boolean> future : results) {
assertEquals(true, future.get());
}
}
}
---------------
Fails on my machine 10 out of 10
if i synchronize the read (also just ugly for testing purposes, no
performance testing done) in the "acquire" method in the
"DefaultProducerCache" the test is green every time:
Changes in the "DefaultProducerCache":
-----------------
@Override
public AsyncProducer acquireProducer(Endpoint endpoint) {
// Try to favor thread locality as some data in the producer's
cache may be shared among threads,
// triggering cases of false sharing
synchronized (this) {
if (endpoint == lastUsedEndpoint && endpoint.isSingletonProducer()) {
return lastUsedProducer;
}
}
try {
AsyncProducer producer = producers.acquire(endpoint);
if (statistics != null) {
statistics.onHit(endpoint.getEndpointUri());
}
synchronized (this) {
lastUsedEndpoint = endpoint;
lastUsedProducer = producer;
}
return producer;
} catch (Exception e) {
throw new FailedToCreateProducerException(endpoint, e);
}
}
-----------------
With the current approach of just syncing the write to the variables
lastUsedEndpoint/Producer, still other threads can read the variables
in between and then get a wrong producer template returned not
matching to the requested endpoint.
Best regards,
Peter Nowak
--
Peter Nowak
[email protected]
ecosio GmbH
Lange Gasse 30 | 1080 Wien | Austria
VAT number: ATU68241501, FN 405017p, Commercial Court Vienna
Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal