Hi, I am interested in investigating this.

Please, can you create a ticket [1] on our Jira and attach the reproducer?

1.
https://camel.apache.org/camel-core/contributing/#_reporting_a_bug_or_problem

Kind regards

On Fri, Nov 10, 2023 at 3:50 PM Peter Nowak <peter.no...@ecosio.com.invalid>
wrote:

> Hi all,
>
> i stumbled across a maybe critical concurrency bug in the
> DefaultProducerCache of camel 4.x under high (concurrent) load related
> to the change from this ticket
> https://issues.apache.org/jira/browse/CAMEL-19058 and this commit
>
> https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab
>
> We encountered strange misrouted exchanges (and therefor errors) after
> upgrading to camel 4.0 under high load scenarios - wrong producer
> templates were picked from the cache due to the lack of
> synchronization
>
> I have created a small unit test which show the problem (i placed it
> in the "DefaultProducerCacheTest" class - just a proof of concept, not
> cleaned up at all :D)
> ---------------
>
> @Test
> public void testAcquireConcurrencyIssues() throws
> InterruptedException, ExecutionException {
>     DefaultProducerCache cache = new DefaultProducerCache(this, context,
> 0);
>     cache.start();
>     List<Endpoint> endpoints = new ArrayList<>();
>     for (int i = 0; i < 3; i++) {
>         Endpoint e = context.getEndpoint("direct:queue:" + i);
>         AsyncProducer p = cache.acquireProducer(e);
>         endpoints.add(e);
>     }
>
>     assertEquals(3, cache.size());
>
>     ExecutorService ex = Executors.newFixedThreadPool(16);
>
>     List<Callable<Boolean>> callables = new ArrayList<>();
>
>     for(int i = 0; i < 500; i++) {
>         int index = i % 3;
>         callables.add(() -> {
>             Producer producer =
> cache.acquireProducer(endpoints.get(index));
>             boolean isEqual =
>
> producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());
>
>             if(!isEqual) {
>                 System.out.println("Endpoint uri to acquire: " +
> endpoints.get(index).getEndpointUri() + ", returned producer (uri): "
> + producer.getEndpoint().getEndpointUri());
>             }
>
>             return isEqual;
>         });
>     }
>
>     for (int i = 0; i < 100; i++) {
>         System.out.println("Iteration: " + (i + 1));
>         List<Future<Boolean>> results = ex.invokeAll(callables);
>         for (Future<Boolean> future : results) {
>             assertEquals(true, future.get());
>         }
>     }
> }
> ---------------
>
> Fails on my machine 10 out of 10
>
> if i synchronize the read (also just ugly for testing purposes, no
> performance testing done) in the "acquire" method in the
> "DefaultProducerCache" the test is green every time:
>
> Changes in the "DefaultProducerCache":
> -----------------
>
> @Override
> public AsyncProducer acquireProducer(Endpoint endpoint) {
>     // Try to favor thread locality as some data in the producer's
> cache may be shared among threads,
>     // triggering cases of false sharing
>     synchronized (this) {
>         if (endpoint == lastUsedEndpoint &&
> endpoint.isSingletonProducer()) {
>             return lastUsedProducer;
>         }
>     }
>
>     try {
>         AsyncProducer producer = producers.acquire(endpoint);
>         if (statistics != null) {
>             statistics.onHit(endpoint.getEndpointUri());
>         }
>
>         synchronized (this) {
>             lastUsedEndpoint = endpoint;
>             lastUsedProducer = producer;
>         }
>
>         return producer;
>     } catch (Exception e) {
>         throw new FailedToCreateProducerException(endpoint, e);
>     }
> }
> -----------------
>
> With the current approach of just syncing the write to the variables
> lastUsedEndpoint/Producer, still other threads can read the variables
> in between and then get a wrong producer template returned not
> matching to the requested endpoint.
>
> Best regards,
> Peter Nowak
>
> --
> Peter Nowak
> peter.no...@ecosio.com
> ecosio GmbH
> Lange Gasse 30 | 1080 Wien | Austria
> VAT number: ATU68241501, FN 405017p, Commercial Court Vienna
> Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal
>


-- 
Otavio R. Piske
http://orpiske.net

Reply via email to