Hi, In running some concurrency tests, where multiple threads do simultaneous puts against a cache, some of the puts appear to be lost. I run a set of 5 threads to do puts, then run a set of five threads to do gets. Many times, all the keys that are put into the cache are not present in the cache. I see this with 3.1.0, and also the newest stable version, 3.2.1.
Running the contained code produces messages such as: 24 Nov 2009 14:44:50,135 ERROR [] com.m1.test.local.ConcurrencyTest Missing value for Key1. The trace shows that the get was done, and that it returned null. 24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true} and InvocationContext [InvocationContext{transaction=null, globalTransaction=null, transactionContext=null, optionOverrides=Option{failSilently=false, cacheModeLocal=false, dataVersion=null, suppressLocking=false, lockAcquisitionTimeout=-1, forceDataGravitation=false, skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, suppressPersistence=false, suppressEventNotification=false}, originLocal=true, bypassUnmarshalling=false}] 24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.CallInterceptor Executing command: GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true}. 24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.commands.read.GetKeyValueCommand Found value null However, earlier in the run I have put the Key into the cache. 24 Nov 2009 14:44:40,831 INFO [] com.m1.test.local.ConcurrencyTest Putting Key1 And the trace indicates that a value was put for the key: 24 Nov 2009 14:44:40,829 TRACE [] org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command PutKeyValueCommand{fqn=/BigNode, dataVersion=null, globalTransaction=null, key=Key1, value=This is a test.} and InvocationContext [InvocationContext{transaction=null, globalTransaction=null, transactionContext=null, optionOverrides=Option{failSilently=false, cacheModeLocal=false, dataVersion=null, suppressLocking=false, lockAcquisitionTimeout=-1, forceDataGravitation=false, skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, suppressPersistence=false, suppressEventNotification=false}, originLocal=true, bypassUnmarshalling=false}] These log records are all for the same run of the test code. Since the get happens after the put, I would expect for the corresponding value to be returned. I have appended the test, which is a reasonably simple and self contained case. Is there anything else, perhaps in terms of MVCC options I should try? Thanks, Alex | package com.m1.test.local; | | import org.jboss.cache.Cache; | import org.jboss.cache.config.Configuration; | import java.util.concurrent.CyclicBarrier; | import org.jboss.cache.DefaultCacheFactory; | import org.jboss.cache.Fqn; | import java.util.HashSet; | import org.apache.commons.logging.Log; | import org.apache.commons.logging.LogFactory; | | /** | * This test will be used to put a specific number of enteries into the cache, | * and measure the amount of memory taken by the cache. | */ | public class ConcurrencyTest | { | private static final Fqn<String> FQN = Fqn.fromString("BigNode"); | private static final Log LOG = LogFactory.getLog(ConcurrencyTest.class); | private static final int NTHREADS = 5; | private static final String VALUE = "This is a test."; | | private final CyclicBarrier barrier = new CyclicBarrier(NTHREADS); | private Thread[] threads = new Thread[NTHREADS]; | | | public ConcurrencyTest() | throws Throwable | { | Cache<Object, Object> cache = createCache(); | | for (int iteration=0; iteration<10; iteration++) | { | for(int j=0; j<NTHREADS; j++) | { | threads[j] = new WriteThread(cache, barrier, iteration, j); | threads[j].start(); | } | | for(int j=0; j<NTHREADS; j++) | { | threads[j].join(500); | } | } | | for (int iteration=0; iteration<10; iteration++) | { | for(int j=0; j<NTHREADS; j++) | { | threads[j] = new ReadThread(cache, barrier, iteration, j); | threads[j].start(); | } | | for(int j=0; j<NTHREADS; j++) | { | threads[j].join(500); | } | } | | while(true) | { | Thread.sleep(3600000); | } | } | | public Cache<Object, Object> createCache() | throws Exception | { | Cache<Object, Object> cache = new DefaultCacheFactory().createCache(); | cache.create(); | cache.start(); | return cache; | } | | private static class WriteThread extends Thread | { | private CyclicBarrier barrier; | private Cache<Object, Object> cache; | private int iteration; | private int writer; | | public WriteThread(Cache<Object, Object> cache, CyclicBarrier barrier, | int iteration, int writer) | throws Throwable | { | this.cache = cache; | this.barrier = barrier; | this.iteration = iteration; | this.writer = writer; | } | | /** | * Run a common set of tests in each thread. | */ | public void run() | { | try | { | barrier.await(); | | for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++) | { | String key = "Key" + id; | cache.put(FQN, key, VALUE); | LOG.info("Putting " + key); | } | } | catch (Exception exception) | { | LOG.error("Write thread failed.", exception); | } | } | } | | | private static class ReadThread extends Thread | { | private CyclicBarrier barrier; | private Cache<Object, Object> cache; | private int iteration; | private int writer; | | public ReadThread(Cache<Object, Object> cache, CyclicBarrier barrier, | int iteration, int writer) | throws Throwable | { | this.cache = cache; | this.barrier = barrier; | this.iteration = iteration; | this.writer = writer; | } | | /** | * Run a common set of tests in each thread. | */ | public void run() | { | try | { | barrier.await(); | | for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++) | { | String key = "Key" + id; | Object result = cache.get(FQN, key); | if (result == null) | { | LOG.error("Missing value for " + key + "."); | } | } | } | catch (Exception exception) | { | LOG.error("ReadThread failed.", exception); | } | } | } | | public static void main(String[] args) | throws Throwable | { | ConcurrencyTest test = new ConcurrencyTest(); | } | } | Some other details: $ java -version java version "1.6.0_16" Java(TM) SE Runtime Environment (build 1.6.0_16-b01) Java HotSpot(TM) 64-Bit Server VM (build 14.2-b01, mixed mode) $ uname -a Linux aklugelnx 2.6.28-16-generic #55-Ubuntu SMP Tue Oct 20 19:48:32 UTC 2009 x86_64 GNU/Linux The only JVM option I use is -Xmx2g to ensure enough space in the cache. I would welcome suggestions, and even requests for more details or some additional tests to further pin down why I don see what I expect. I be happy to find it to be a configuration option I missed. Thanks, Alex View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4267408#4267408 Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4267408 _______________________________________________ jboss-user mailing list jboss-user@lists.jboss.org https://lists.jboss.org/mailman/listinfo/jboss-user