Hi,

   In running some concurrency tests, where multiple threads do simultaneous 
puts
 against a cache, some of the puts appear to be lost. I run a set of 5 threads 
to do
 puts, then run a set of five threads to do gets. Many times, all the keys that 
are put
 into the cache are not present in the cache. I see this with 3.1.0, and also 
the newest
 stable version, 3.2.1.

  Running the contained code produces messages such as:
    24 Nov 2009 14:44:50,135 ERROR [] com.m1.test.local.ConcurrencyTest Missing 
value for Key1.
  
    The trace shows that the get was done, and that it returned null.
  24 Nov 2009 14:44:50,134 TRACE [] 
org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command 
GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true} and 
InvocationContext [InvocationContext{transaction=null, globalTransaction=null, 
transactionContext=null, optionOverrides=Option{failSilently=false, 
cacheModeLocal=false, dataVersion=null, suppressLocking=false, 
lockAcquisitionTimeout=-1, forceDataGravitation=false, 
skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, 
suppressPersistence=false, suppressEventNotification=false}, originLocal=true, 
bypassUnmarshalling=false}]
24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.CallInterceptor 
Executing command: GetKeyValueCommand{fqn=/BigNode, key=Key1, 
sendNodeEvent=true}.
24 Nov 2009 14:44:50,134 TRACE [] 
org.jboss.cache.commands.read.GetKeyValueCommand Found value null

  However, earlier in the  run I have put the Key into the cache.

  24 Nov 2009 14:44:40,831 INFO  [] com.m1.test.local.ConcurrencyTest Putting 
Key1

  And the trace indicates that a value was put for the key:

  24 Nov 2009 14:44:40,829 TRACE [] 
org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command 
PutKeyValueCommand{fqn=/BigNode, dataVersion=null, globalTransaction=null, 
key=Key1, value=This is a test.} and InvocationContext 
[InvocationContext{transaction=null, globalTransaction=null, 
transactionContext=null, optionOverrides=Option{failSilently=false, 
cacheModeLocal=false, dataVersion=null, suppressLocking=false, 
lockAcquisitionTimeout=-1, forceDataGravitation=false, 
skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, 
suppressPersistence=false, suppressEventNotification=false}, originLocal=true, 
bypassUnmarshalling=false}]

  These log records are all for the same run of the test code. Since the get 
happens after the put, I would expect for the corresponding value to be 
returned.

  I have appended the test, which is a reasonably simple and self contained 
case.
 Is there anything else, perhaps in terms of MVCC options I should try?

                                                             Thanks,
                                                                      Alex


  | package com.m1.test.local;
  | 
  | import org.jboss.cache.Cache;
  | import org.jboss.cache.config.Configuration;
  | import java.util.concurrent.CyclicBarrier;
  | import org.jboss.cache.DefaultCacheFactory;
  | import org.jboss.cache.Fqn;
  | import java.util.HashSet;
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | /**
  |  * This test will be used to put a specific number of enteries into the 
cache,
  |  * and measure the amount of memory taken by the cache.
  |  */
  | public class ConcurrencyTest
  | {
  |     private static final Fqn<String> FQN              = 
Fqn.fromString("BigNode");
  |     private static final Log         LOG              = 
LogFactory.getLog(ConcurrencyTest.class);
  |     private static final int         NTHREADS         = 5;
  |     private static final String      VALUE            = "This is a test.";
  | 
  |     private final CyclicBarrier barrier       = new CyclicBarrier(NTHREADS);
  |     private       Thread[]      threads       = new Thread[NTHREADS];
  | 
  | 
  |     public ConcurrencyTest()
  |            throws Throwable
  |     {
  |         Cache<Object, Object> cache = createCache();
  | 
  |         for (int iteration=0; iteration<10; iteration++)
  |         {
  |             for(int j=0; j<NTHREADS; j++)
  |             {
  |                 threads[j] = new WriteThread(cache, barrier, iteration, j);
  |                 threads[j].start();
  |             }
  | 
  |             for(int j=0; j<NTHREADS; j++)
  |             {
  |                 threads[j].join(500);
  |             }
  |         }
  | 
  |         for (int iteration=0; iteration<10; iteration++)
  |         {
  |             for(int j=0; j<NTHREADS; j++)
  |             {
  |                 threads[j] = new ReadThread(cache, barrier, iteration, j);
  |                 threads[j].start();
  |             }
  | 
  |             for(int j=0; j<NTHREADS; j++)
  |             {
  |                 threads[j].join(500);
  |             }
  |         }
  | 
  |         while(true)
  |         {
  |             Thread.sleep(3600000);
  |         }
  |     }
  | 
  |       public Cache<Object, Object> createCache()
  |              throws Exception
  |       {
  |         Cache<Object, Object> cache = new 
DefaultCacheFactory().createCache();
  |         cache.create();
  |         cache.start();
  |         return cache;
  |     }
  |     
  |     private static class WriteThread extends Thread
  |     {
  |         private       CyclicBarrier         barrier;
  |         private       Cache<Object, Object> cache;
  |         private       int                   iteration;
  |         private       int                   writer;
  | 
  |         public WriteThread(Cache<Object, Object> cache,     CyclicBarrier 
barrier,
  |                            int                   iteration, int           
writer)
  |                throws Throwable
  |         {
  |             this.cache     = cache;
  |             this.barrier   = barrier;
  |             this.iteration = iteration;
  |             this.writer    = writer;
  |         }
  | 
  |         /**
  |          * Run a common set of tests in each thread.
  |          */
  |         public void run()
  |         {
  |             try
  |             {
  |               barrier.await();
  | 
  |               for (int id=1000*iteration + 100*writer; id<1000*iteration + 
100*(writer+1); id++)
  |               {
  |                 String key = "Key" + id;
  |                 cache.put(FQN, key, VALUE);
  |                 LOG.info("Putting " + key);
  |               }
  |             }
  |             catch (Exception exception)
  |             {
  |               LOG.error("Write thread failed.", exception);
  |             }
  |         }
  |     }
  | 
  | 
  |     private static class ReadThread extends Thread
  |     {
  |         private       CyclicBarrier         barrier;
  |         private       Cache<Object, Object> cache;
  |         private       int                   iteration;
  |         private       int                   writer;
  | 
  |         public ReadThread(Cache<Object, Object> cache,    CyclicBarrier 
barrier,
  |                           int                   iteration, int           
writer)
  |                throws Throwable
  |         {
  |             this.cache     = cache;
  |             this.barrier   = barrier;
  |             this.iteration = iteration;
  |             this.writer    = writer;
  |         }
  | 
  |         /**
  |          * Run a common set of tests in each thread.
  |          */
  |         public void run()
  |         {
  |             try
  |             {
  |               barrier.await();
  | 
  |               for (int id=1000*iteration + 100*writer; id<1000*iteration + 
100*(writer+1); id++)
  |               {
  |                 String key = "Key" + id;
  |                 Object result = cache.get(FQN, key);
  |                 if (result == null)
  |                 {
  |                   LOG.error("Missing value for " + key + ".");
  |                 }
  |               }
  |             }
  |             catch (Exception exception)
  |             {
  |               LOG.error("ReadThread failed.", exception);
  |             }
  |         }
  |     }
  | 
  |     public static void main(String[] args)
  |                        throws Throwable
  |     {
  |         ConcurrencyTest test = new ConcurrencyTest();
  |     }
  | }
  | 

Some other details:

$ java -version
java version "1.6.0_16"
Java(TM) SE Runtime Environment (build 1.6.0_16-b01)
Java HotSpot(TM) 64-Bit Server VM (build 14.2-b01, mixed mode)

$ uname -a
Linux aklugelnx 2.6.28-16-generic #55-Ubuntu SMP Tue Oct 20 19:48:32 UTC 2009 
x86_64 GNU/Linux

The only JVM option I use is -Xmx2g to ensure enough space in the cache.

I would welcome suggestions, and even requests for more details or some 
additional tests to further pin down why I don see what I expect. I be happy to 
find it to be a configuration option I missed.

                                                                 Thanks,
                                                                           Alex



  

View the original post : 
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4267408#4267408

Reply to the post : 
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4267408
_______________________________________________
jboss-user mailing list
jboss-user@lists.jboss.org
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to