Hi, I am interested in investigating this. Please, can you create a ticket [1] on our Jira and attach the reproducer?
1. https://camel.apache.org/camel-core/contributing/#_reporting_a_bug_or_problem Kind regards On Fri, Nov 10, 2023 at 3:50 PM Peter Nowak <peter.no...@ecosio.com.invalid> wrote: > Hi all, > > i stumbled across a maybe critical concurrency bug in the > DefaultProducerCache of camel 4.x under high (concurrent) load related > to the change from this ticket > https://issues.apache.org/jira/browse/CAMEL-19058 and this commit > > https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab > > We encountered strange misrouted exchanges (and therefor errors) after > upgrading to camel 4.0 under high load scenarios - wrong producer > templates were picked from the cache due to the lack of > synchronization > > I have created a small unit test which show the problem (i placed it > in the "DefaultProducerCacheTest" class - just a proof of concept, not > cleaned up at all :D) > --------------- > > @Test > public void testAcquireConcurrencyIssues() throws > InterruptedException, ExecutionException { > DefaultProducerCache cache = new DefaultProducerCache(this, context, > 0); > cache.start(); > List<Endpoint> endpoints = new ArrayList<>(); > for (int i = 0; i < 3; i++) { > Endpoint e = context.getEndpoint("direct:queue:" + i); > AsyncProducer p = cache.acquireProducer(e); > endpoints.add(e); > } > > assertEquals(3, cache.size()); > > ExecutorService ex = Executors.newFixedThreadPool(16); > > List<Callable<Boolean>> callables = new ArrayList<>(); > > for(int i = 0; i < 500; i++) { > int index = i % 3; > callables.add(() -> { > Producer producer = > cache.acquireProducer(endpoints.get(index)); > boolean isEqual = > > producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri()); > > if(!isEqual) { > System.out.println("Endpoint uri to acquire: " + > endpoints.get(index).getEndpointUri() + ", returned producer (uri): " > + producer.getEndpoint().getEndpointUri()); > } > > return isEqual; > }); > } > > for (int i = 0; i < 100; i++) { > System.out.println("Iteration: " + (i + 1)); > List<Future<Boolean>> results = ex.invokeAll(callables); > for (Future<Boolean> future : results) { > assertEquals(true, future.get()); > } > } > } > --------------- > > Fails on my machine 10 out of 10 > > if i synchronize the read (also just ugly for testing purposes, no > performance testing done) in the "acquire" method in the > "DefaultProducerCache" the test is green every time: > > Changes in the "DefaultProducerCache": > ----------------- > > @Override > public AsyncProducer acquireProducer(Endpoint endpoint) { > // Try to favor thread locality as some data in the producer's > cache may be shared among threads, > // triggering cases of false sharing > synchronized (this) { > if (endpoint == lastUsedEndpoint && > endpoint.isSingletonProducer()) { > return lastUsedProducer; > } > } > > try { > AsyncProducer producer = producers.acquire(endpoint); > if (statistics != null) { > statistics.onHit(endpoint.getEndpointUri()); > } > > synchronized (this) { > lastUsedEndpoint = endpoint; > lastUsedProducer = producer; > } > > return producer; > } catch (Exception e) { > throw new FailedToCreateProducerException(endpoint, e); > } > } > ----------------- > > With the current approach of just syncing the write to the variables > lastUsedEndpoint/Producer, still other threads can read the variables > in between and then get a wrong producer template returned not > matching to the requested endpoint. > > Best regards, > Peter Nowak > > -- > Peter Nowak > peter.no...@ecosio.com > ecosio GmbH > Lange Gasse 30 | 1080 Wien | Austria > VAT number: ATU68241501, FN 405017p, Commercial Court Vienna > Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal > -- Otavio R. Piske http://orpiske.net