Hi all, i stumbled across a maybe critical concurrency bug in the DefaultProducerCache of camel 4.x under high (concurrent) load related to the change from this ticket https://issues.apache.org/jira/browse/CAMEL-19058 and this commit https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab
We encountered strange misrouted exchanges (and therefor errors) after upgrading to camel 4.0 under high load scenarios - wrong producer templates were picked from the cache due to the lack of synchronization I have created a small unit test which show the problem (i placed it in the "DefaultProducerCacheTest" class - just a proof of concept, not cleaned up at all :D) --------------- @Test public void testAcquireConcurrencyIssues() throws InterruptedException, ExecutionException { DefaultProducerCache cache = new DefaultProducerCache(this, context, 0); cache.start(); List<Endpoint> endpoints = new ArrayList<>(); for (int i = 0; i < 3; i++) { Endpoint e = context.getEndpoint("direct:queue:" + i); AsyncProducer p = cache.acquireProducer(e); endpoints.add(e); } assertEquals(3, cache.size()); ExecutorService ex = Executors.newFixedThreadPool(16); List<Callable<Boolean>> callables = new ArrayList<>(); for(int i = 0; i < 500; i++) { int index = i % 3; callables.add(() -> { Producer producer = cache.acquireProducer(endpoints.get(index)); boolean isEqual = producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri()); if(!isEqual) { System.out.println("Endpoint uri to acquire: " + endpoints.get(index).getEndpointUri() + ", returned producer (uri): " + producer.getEndpoint().getEndpointUri()); } return isEqual; }); } for (int i = 0; i < 100; i++) { System.out.println("Iteration: " + (i + 1)); List<Future<Boolean>> results = ex.invokeAll(callables); for (Future<Boolean> future : results) { assertEquals(true, future.get()); } } } --------------- Fails on my machine 10 out of 10 if i synchronize the read (also just ugly for testing purposes, no performance testing done) in the "acquire" method in the "DefaultProducerCache" the test is green every time: Changes in the "DefaultProducerCache": ----------------- @Override public AsyncProducer acquireProducer(Endpoint endpoint) { // Try to favor thread locality as some data in the producer's cache may be shared among threads, // triggering cases of false sharing synchronized (this) { if (endpoint == lastUsedEndpoint && endpoint.isSingletonProducer()) { return lastUsedProducer; } } try { AsyncProducer producer = producers.acquire(endpoint); if (statistics != null) { statistics.onHit(endpoint.getEndpointUri()); } synchronized (this) { lastUsedEndpoint = endpoint; lastUsedProducer = producer; } return producer; } catch (Exception e) { throw new FailedToCreateProducerException(endpoint, e); } } ----------------- With the current approach of just syncing the write to the variables lastUsedEndpoint/Producer, still other threads can read the variables in between and then get a wrong producer template returned not matching to the requested endpoint. Best regards, Peter Nowak -- Peter Nowak peter.no...@ecosio.com ecosio GmbH Lange Gasse 30 | 1080 Wien | Austria VAT number: ATU68241501, FN 405017p, Commercial Court Vienna Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal