Hi all,

i stumbled across a maybe critical concurrency bug in the
DefaultProducerCache of camel 4.x under high (concurrent) load related
to the change from this ticket
https://issues.apache.org/jira/browse/CAMEL-19058 and this commit
https://github.com/apache/camel/commit/921ce519331ac5c8b0a1dfd099f9acbaba4feeab

We encountered strange misrouted exchanges (and therefor errors) after
upgrading to camel 4.0 under high load scenarios - wrong producer
templates were picked from the cache due to the lack of
synchronization

I have created a small unit test which show the problem (i placed it
in the "DefaultProducerCacheTest" class - just a proof of concept, not
cleaned up at all :D)
---------------

@Test
public void testAcquireConcurrencyIssues() throws
InterruptedException, ExecutionException {
    DefaultProducerCache cache = new DefaultProducerCache(this, context, 0);
    cache.start();
    List<Endpoint> endpoints = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
        Endpoint e = context.getEndpoint("direct:queue:" + i);
        AsyncProducer p = cache.acquireProducer(e);
        endpoints.add(e);
    }

    assertEquals(3, cache.size());

    ExecutorService ex = Executors.newFixedThreadPool(16);

    List<Callable<Boolean>> callables = new ArrayList<>();

    for(int i = 0; i < 500; i++) {
        int index = i % 3;
        callables.add(() -> {
            Producer producer = cache.acquireProducer(endpoints.get(index));
            boolean isEqual =
producer.getEndpoint().getEndpointUri().equalsIgnoreCase(endpoints.get(index).getEndpointUri());

            if(!isEqual) {
                System.out.println("Endpoint uri to acquire: " +
endpoints.get(index).getEndpointUri() + ", returned producer (uri): "
+ producer.getEndpoint().getEndpointUri());
            }

            return isEqual;
        });
    }

    for (int i = 0; i < 100; i++) {
        System.out.println("Iteration: " + (i + 1));
        List<Future<Boolean>> results = ex.invokeAll(callables);
        for (Future<Boolean> future : results) {
            assertEquals(true, future.get());
        }
    }
}
---------------

Fails on my machine 10 out of 10

if i synchronize the read (also just ugly for testing purposes, no
performance testing done) in the "acquire" method in the
"DefaultProducerCache" the test is green every time:

Changes in the "DefaultProducerCache":
-----------------

@Override
public AsyncProducer acquireProducer(Endpoint endpoint) {
    // Try to favor thread locality as some data in the producer's
cache may be shared among threads,
    // triggering cases of false sharing
    synchronized (this) {
        if (endpoint == lastUsedEndpoint && endpoint.isSingletonProducer()) {
            return lastUsedProducer;
        }
    }

    try {
        AsyncProducer producer = producers.acquire(endpoint);
        if (statistics != null) {
            statistics.onHit(endpoint.getEndpointUri());
        }

        synchronized (this) {
            lastUsedEndpoint = endpoint;
            lastUsedProducer = producer;
        }

        return producer;
    } catch (Exception e) {
        throw new FailedToCreateProducerException(endpoint, e);
    }
}
-----------------

With the current approach of just syncing the write to the variables
lastUsedEndpoint/Producer, still other threads can read the variables
in between and then get a wrong producer template returned not
matching to the requested endpoint.

Best regards,
Peter Nowak

-- 
Peter Nowak
peter.no...@ecosio.com
ecosio GmbH
Lange Gasse 30 | 1080 Wien | Austria
VAT number: ATU68241501, FN 405017p, Commercial Court Vienna
Managing Directors: Christoph Ebm, Philipp Liegl, Marco Zapletal

Reply via email to