Author: schor
Date: Fri Apr 25 16:09:00 2014
New Revision: 1590071

URL: http://svn.apache.org/r1590071
Log:
[UIMA-3774] make JCasHashMap thread safe. Add two test cases

Modified:
    
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
    
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasImpl.java
    
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
    
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java

Modified: 
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
URL: 
http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java?rev=1590071&r1=1590070&r2=1590071&view=diff
==============================================================================
--- 
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
 (original)
+++ 
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasHashMap.java
 Fri Apr 25 16:09:00 2014
@@ -20,8 +20,10 @@
 package org.apache.uima.jcas.impl;
 
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.uima.cas.impl.FeatureStructureImpl;
+import org.apache.uima.jcas.cas.TOP;
 
 /**
  * Version 2 (2014) of map between CAS addr and JCasCover Objects
@@ -48,43 +50,289 @@ import org.apache.uima.cas.impl.FeatureS
  *     reuse on next put 
  *   change put to assume adding new item not already in table
  * 
+ * Multi-threading:  For read-only CASes, multiple iterators in 
+ * different threads could be accessing the map and updating it.
  * 
+ * Strategy: have 1 outer implementation delegating to multiple inner ones
+ *   number = concurrency level (a power of 2)
+ *   
+ *   The hash would use some # of low order bits to address the right inner 
one. 
  */
 public class JCasHashMap {
   
+  private static final int DEFAULT_CONCURRENCY_LEVEL;
+  static {
+    int cores = Runtime.getRuntime().availableProcessors();
+    DEFAULT_CONCURRENCY_LEVEL = (cores < 17) ? cores :
+                                (cores < 33) ? 16 + (cores - 16) / 2 : 
+                                               24 + (cores - 24) / 4;
+  }
   // set to true to collect statistics for tuning
   // you have to also put a call to jcas.showJfsFromCaddrHistogram() at the 
end of the run
   private static final boolean TUNE = false;
-  
+    
   //These are for tuning measurements
   private int histogram [];
-  private int nbrProbes;
   private int maxProbe = 0;
-  
-  private int sizeWhichTriggersExpansion;
-  
+
   private final float loadFactor = (float)0.60;
-  
+
   private final int initialCapacity; 
+
+  private final boolean useCache;
   
-  private int size; // number of elements in the table
+  private final int concurrencyLevel;
   
-  private FeatureStructureImpl [] table;
-    
-  // These are for hashing the CAS address
-  private int bitsMask;  // 1's to "and" with result to keep in range 
+  private final int concurrencyBitmask; 
   
-  private final boolean useCache;
+  private final int concurrencyLevelBits;
+  
+  private final SubMap[] subMaps;
+  
+  private final AtomicInteger aggregate_size = new AtomicInteger(0);
+
+  private final int subMapInitialCapacity;
+
+  private class SubMap {
+    private int sizeWhichTriggersExpansion;
+    private int size; // number of elements in the table  
+    private FeatureStructureImpl [] table;
+    private boolean secondTimeShrinkable = false;
+    private int bitsMask;  // 1's to "and" with result to keep in range   
   
-  private boolean secondTimeShrinkable = false;
+    // for testing only:
+    int getbitsMask() {return bitsMask;}
+    
+    private SubMap newTable(int capacity) {
+      assert(Integer.bitCount(capacity) == 1);
+      table = new FeatureStructureImpl[capacity];
+      bitsMask = capacity - 1;
+      size = 0;
+      sizeWhichTriggersExpansion = (int)(capacity * loadFactor);
+      return this;
+    }
+    
+    private synchronized void clear() {
+      // see if size is less than the 1/2 size that triggers expansion
+      if (size <  (sizeWhichTriggersExpansion >>> 1)) {
+        // if 2nd time then shrink by 50%
+        //   this is done to avoid thrashing around the threshold
+        if (secondTimeShrinkable) {
+          secondTimeShrinkable = false;
+          final int newCapacity = Math.max(subMapInitialCapacity, table.length 
>>> 1);
+          if (newCapacity < table.length) { 
+            newTable(newCapacity);  // shrink table by 50%
+          } else { // don't shrink below minimum
+            Arrays.fill(table,  null);
+          }
+          size = 0;
+          return;
+        } else {
+          secondTimeShrinkable = true;
+        }
+      } else {
+        secondTimeShrinkable = false; // reset this to require 2 triggers in a 
row
+      }
+      size = 0;
+      Arrays.fill(table, null);
+    }      
+    
+    private synchronized FeatureStructureImpl get(int key, int hash) {
+      int nbrProbes = 1;
+      int probeAddr = hash & bitsMask;
+      int probeDelta = 1;
+      FeatureStructureImpl maybe = table[probeAddr];
+      while ((null != maybe) && (maybe.getAddress() != key)) {
+        if (TUNE) {
+          nbrProbes++;
+        }
+        probeAddr = bitsMask & (probeAddr + (probeDelta++));
+        maybe = table[probeAddr];
+      }  
+
+      if (TUNE) {
+        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+        maxProbe = Math.max(maxProbe, nbrProbes);
+      }
+      return maybe;    
+    }
+    
+    /**
+     * Gets a value, but if the value isn't there, it reserves the slot where 
it will go
+     * with a new instance where the key matches, but the type is null
+     * @param key - the addr in the heap
+     * @param hash - the hash that was already computed from the key
+     * @return - the found fs, or null
+     */
+    private synchronized FeatureStructureImpl getReserve(int key, int hash) {
+      int nbrProbes = 1;
+      int probeAddr = hash & bitsMask;
+      int probeDelta = 1;
+      FeatureStructureImpl maybe = table[probeAddr];
+      while ((null != maybe)) {
+        if (maybe.getAddress() == key) {
+          while (((TOP)maybe).jcasType == null) {
+            // we hit a reserve marker - there is another thread in the 
process of creating an instance of this,
+            // so wait for it to finish and then return it
+            try {
+              wait();  // releases the synchronized monitor, otherwise this 
segment blocked for others while waiting
+            } catch (InterruptedException e) {
+            }
+            maybe = table[probeAddr];
+          }
+          if (TUNE) {
+            histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+            maxProbe = Math.max(maxProbe, nbrProbes);
+          }
+          return maybe;
+        }
+        // is not null, but is wrong key
+        if (TUNE) {
+          nbrProbes++;
+        }
+        probeAddr = bitsMask & (probeAddr + (probeDelta++));
+        maybe = table[probeAddr];
+      }
+      
+      // maybe is null
+        // reserve this slot to prevent other "getReserved" calls for this 
same instance from succeeding, 
+        // causing them to wait until this slot gets filled in with a FS value
+      table[probeAddr] = new TOP(key, null);  // null indicates its a RESERVE 
marker
+      
+     
+      if (TUNE) {
+        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+        maxProbe = Math.max(maxProbe, nbrProbes);
+      }
+      return maybe;    
+    }
+
+    
+    private synchronized void put(int key, FeatureStructureImpl value, int 
hash) {
+      if (size >= sizeWhichTriggersExpansion) {
+        increaseTableCapacity();
+      }
+      size++;
+      
+      int probeAddr = hash & bitsMask;
+      int probeDelta = 1;
+      int nbrProbes = 1;
+      FeatureStructureImpl m = table[probeAddr];
+      while (null != m) {
+        if (((TOP)m).jcasType == null) { 
+          // this slot was previously reserved - check to see if the key 
matches
+          // (must be same key, otherwise, impl could deadlock)
+          if (m.getAddress() == key) {
+            // found the previously reserved slot
+            table[probeAddr] = value;
+            notifyAll();
+            if (TUNE) {
+              histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+              maxProbe = Math.max(maxProbe, nbrProbes);
+            }
+            return;
+          }
+        }
+            
+        if (TUNE) {
+          nbrProbes++;
+        }
+        probeAddr = bitsMask & (probeAddr + (probeDelta++));
+        m = table[probeAddr];
+      }
+      if (TUNE) {
+        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+        maxProbe = Math.max(maxProbe, nbrProbes);
+      }
+      table[probeAddr] = value;   
+    }
+    
+//    private synchronized FeatureStructureImpl putIfAbsent(
+//        int key, 
+//        Callable<FeatureStructureImpl> valueProducer, 
+//        int hash) throws Exception {
+//      int nbrProbes = 1;
+//      int probeAddr = hash & bitsMask;
+//      int probeDelta = 1;
+//      FeatureStructureImpl maybe = table[probeAddr];
+//      while ((null != maybe) && (maybe.getAddress() != key)) {
+//        if (TUNE) {
+//          nbrProbes++;
+//        }
+//        probeAddr = bitsMask & (probeAddr + (probeDelta++));
+//        maybe = table[probeAddr];
+//      }
+//      
+//      if (TUNE) {
+//        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+//        maxProbe = Math.max(maxProbe, nbrProbes);
+//      }
+//      
+//      if (null == maybe) {
+//        table[probeAddr] = maybe = valueProducer.call();
+//        aggregate_size.incrementAndGet();
+//      }
+//      return maybe;    
+//    }
+    
+//    private int findEmptySlot(int key, int hash) {
+//      int probeAddr = hash & bitsMask;
+//      int probeDelta = 1;
+//      int nbrProbes = 1;
+//      FeatureStructureImpl m = table[probeAddr];
+//      while (null != m) {
+//        if (((TOP)m).jcasType == null) { 
+//          // this slot was previously reserved - check to see if the key 
matches
+//          // (must be same key, otherwise, impl could deadlock)
+//          if (m.getAddress() == key) {
+//            
+//          }
+//        if (TUNE) {
+//          nbrProbes++;
+//        }
+//        probeAddr = bitsMask & (probeAddr + (probeDelta++));
+//      }
+//      if (TUNE) {
+//        histogram[Math.min(histogram.length - 1, nbrProbes)]++;
+//        maxProbe = Math.max(maxProbe, nbrProbes);
+//      }
+//      return probeAddr;
+//    }
 
-  // for testing only:
-  int getbitsMask() {return bitsMask;}
+    private void increaseTableCapacity() {
+      final FeatureStructureImpl [] oldTable = table; 
+      final int oldCapacity = oldTable.length;
+    
+      int newCapacity = 2 * oldCapacity;
+      
+      if (TUNE) {
+        System.out.println("Size increasing from " + oldCapacity + " to " + 
newCapacity);
+      }
+      newTable(newCapacity);
+      size = 0;
+      for (int i = 0; i < oldCapacity; i++) {
+        FeatureStructureImpl fs = oldTable[i];
+        if (fs != null) {
+          int key = fs.getAddress();
+          int hash = hashInt(key);
+          put(key, fs, hash >>> concurrencyLevelBits);
+        }   
+      }
+    }
+  }
   
   JCasHashMap(int capacity, boolean doUseCache) {
+    this(capacity, doUseCache, DEFAULT_CONCURRENCY_LEVEL);
+  }
+  
+  JCasHashMap(int capacity, boolean doUseCache, int aConcurrencyLevel) {
     this.useCache = doUseCache;
-    capacity = Math.max(32, capacity);
-    int bits = (32 - Integer.numberOfLeadingZeros(capacity - 1));
+    concurrencyLevel = (aConcurrencyLevel > 1) ? 
+        Integer.highestOneBit(1 + aConcurrencyLevel) :  // 1 to 128
+        1;
+    concurrencyBitmask = concurrencyLevel - 1;
+    concurrencyLevelBits = Integer.numberOfTrailingZeros(concurrencyLevel); 
+    capacity = Integer.highestOneBit(1 + Math.max(31, 
Math.max(concurrencyLevel, capacity)));
     // initialSize = 2, bits = 1, 2
     // initialSize = 3, bits = 2, 4
     // initialSize = 4, bits = 2, 4
@@ -92,128 +340,83 @@ public class JCasHashMap {
     // initialSize = 6, bits = 3
     // initialSize = 7, bits = 3
     // initialSize = 8, bits = 3
-    capacity = 1<<bits;    // rounds up to next power of 32
-    newTable(capacity);
     initialCapacity = capacity;
+    subMaps = new SubMap[concurrencyLevel];
+    subMapInitialCapacity = initialCapacity / concurrencyLevel;  // always 2 
or more
+    for (int i = 0; i < concurrencyLevel; i++) {
+      subMaps[i] = (new SubMap()).newTable(subMapInitialCapacity);
+    }
     if (TUNE) {
       histogram = new int[200];
       Arrays.fill(histogram, 0);
     }
   }
-  
-  private void newTable(int capacity) {
-    assert(Integer.bitCount(capacity) == 1);
-    table = new FeatureStructureImpl[capacity];
-    bitsMask = capacity - 1;
-    size = 0;
-    sizeWhichTriggersExpansion = (int)(capacity * loadFactor);
-  }
-    
+      
   // cleared when cas reset
   // storage management:
   //   shrink if current number of entries
   //      wouldn't trigger an expansion if the size was reduced by 1/2 
-  public void clear() {
+  public synchronized void clear() {
     if (!this.useCache) {
       return;
     }
-    // see if size is less than the 1/2 size that triggers expansion
-    if (size <  (sizeWhichTriggersExpansion >>> 1)) {
-      // if 2nd time then shrink by 50%
-      //   this is done to avoid thrashing around the threshold
-      if (secondTimeShrinkable) {
-        secondTimeShrinkable = false;
-        final int newCapacity = Math.max(initialCapacity, table.length >>> 1);
-        if (newCapacity < table.length) { 
-          newTable(newCapacity);  // shrink table by 50%
-        } else { // don't shrink below minimum
-          Arrays.fill(table,  null);
-        }
-        size = 0;
-        return;
-      } else {
-        secondTimeShrinkable = true;
-      }
-    } else {
-      secondTimeShrinkable = false; // reset this to require 2 triggers in a 
row
+    for (SubMap m : subMaps) {
+      m.clear();
     }
-    size = 0;
-    Arrays.fill(table, null);
+    aggregate_size.set(0);
   }
 
   public FeatureStructureImpl get(int key) {
     if (!this.useCache) {
       return null;
     }
-    int probeAddr = hashInt(key);
-    int probeDelta = 1;
-    FeatureStructureImpl maybe = table[probeAddr];
-    while ((null != maybe) && (maybe.getAddress() != key)) {
-      if (TUNE) {
-        nbrProbes++;
-      }
-      probeAddr = bitsMask & (probeAddr + (probeDelta++));
-      maybe = table[probeAddr];
-    }  
-
-    if (TUNE) {
-      histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-      maxProbe = Math.max(maxProbe, nbrProbes);
-    }
-    // doesn't work - requires no intervening get between save and use
-    // and user code running in readObject() of create instance of JCas cover 
object
-    //   *could* do something here.
-//    indexOfLastFreeCell = (maybe == null) ? probeAddr : -1;
-    return maybe;    
+    int hash = hashInt(key);
+    int subMapIndex = hash & concurrencyBitmask;
+    
+    SubMap m = subMaps[subMapIndex];
+    return m.get(key, hash >>> concurrencyLevelBits);    
   }
   
-  private int findEmptySlot(int key) {
-    int probeAddr = hashInt(key);
-    int probeDelta = 1;
-    while (null != table[probeAddr]) {
-      if (TUNE) {
-        nbrProbes++;
-      }
-      probeAddr = bitsMask & (probeAddr + (probeDelta++));
-    }
-    if (TUNE) {
-      histogram[Math.min(histogram.length - 1, nbrProbes)]++;
-      maxProbe = Math.max(maxProbe, nbrProbes);
+  public FeatureStructureImpl getReserve(int key) {
+    if (!this.useCache) {
+      return null;
     }
-//    indexOfLastFreeCell = probeAddr;
-    return probeAddr;
+    int hash = hashInt(key);
+    int subMapIndex = hash & concurrencyBitmask;
+    
+    SubMap m = subMaps[subMapIndex];
+    return m.getReserve(key, hash >>> concurrencyLevelBits);    
   }
+
   
-  /**
-   * When put is called, the caller must already have just 
-   * previously used get or findEmptySlot to set the indexOfLastFreeCell
-   * @param value
-   */
-//  public void putAfterFindingEmptyCell(FeatureStructureImpl value) {
+//  public FeatureStructureImpl putIfAbsent(int key, 
Callable<FeatureStructureImpl> valueProducer) throws Exception {
 //    if (!this.useCache) {
-//      return;
-//    }
-//    if (size >= sizeWhichTriggersExpansion) {
-//      increaseSize();
-//      findEmptySlot(value.getAddress());  //reset the indexOfLastFreeCell
+//      return valueProducer.call();
 //    }
-//    size++;
-//    table[indexOfLastFreeCell] = value;   
+//    int hash = hashInt(key);
+//    int subMapIndex = hash & concurrencyBitmask;
+//    
+//    SubMap m = subMaps[subMapIndex];
+//    return m.putIfAbsent(key, valueProducer, hash >>> concurrencyLevelBits); 
  
 //  }
+//  
+
   
   public void put(FeatureStructureImpl value) {
     if (!this.useCache) {
       return;
     }
-    if (size >= sizeWhichTriggersExpansion) {
-      increaseTableCapacity();
-    }
-    size++;
-    table[findEmptySlot(value.getAddress())] = value;   
+    int key = value.getAddress();
+    int hash = hashInt(key);
+    int subMapIndex = hash & concurrencyBitmask;
+    
+    SubMap m = subMaps[subMapIndex];
+    m.put(key, value, hash >>> concurrencyLevelBits);
+    aggregate_size.incrementAndGet();
   }
   
   public int size() {
-    return size;
+    return aggregate_size.get();
   }
 
   // The hash function is derived from murmurhash3 32 bit, which
@@ -242,26 +445,16 @@ public class JCasHashMap {
     h1 ^= h1 >>> 13;
     h1 *= 0xc2b2ae35;
     h1 ^= h1 >>> 16;
-    return h1 & bitsMask;
+    return h1;
   }
      
-  private void increaseTableCapacity() {
-    final FeatureStructureImpl [] oldTable = table; 
-    final int oldCapacity = oldTable.length;
-  
-    int newCapacity = 2 * oldCapacity;
-    
-    if (TUNE) {
-      System.out.println("Size increasing from " + oldCapacity + " to " + 
newCapacity);
-    }
-    newTable(newCapacity);
-    size = 0;
-    for (int i = 0; i < oldCapacity; i++) {
-      FeatureStructureImpl fs = oldTable[i];
-      if (fs != null) {
-        put(fs);
-      }   
+  int[] getCapacities() {
+    int[] r = new int[subMaps.length];
+    int i = 0;
+    for (SubMap subMap : subMaps) {
+      r[i++] = subMap.bitsMask + 1;
     }
+    return r;
   }
   
   public void showHistogram() {
@@ -270,12 +463,16 @@ public class JCasHashMap {
               + maxProbe);
       for (int i = 0; i <= maxProbe; i++) {
         System.out.println(i + ": " + histogram[i]);
-      }      
-      System.out.println("bytes / entry = " + (float) (table.length) * 4 / 
size);
+      }     
+      int agg_tableLength = 0;
+      for (SubMap m : subMaps) {
+        agg_tableLength += m.table.length;
+      }
+      System.out.println("bytes / entry = " + (float) (agg_tableLength) * 4 / 
size());
       System.out.format("size = %,d, prevExpansionTriggerSize = %,d, next = 
%,d%n",
-          size,
-          (int) ((table.length >>> 1) * loadFactor),
-          (int) (table.length * loadFactor));
+          size(),
+          (int) ((agg_tableLength >>> 1) * loadFactor),
+          (int) (agg_tableLength * loadFactor));
     }
   }
 }

Modified: 
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasImpl.java
URL: 
http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasImpl.java?rev=1590071&r1=1590070&r2=1590071&view=diff
==============================================================================
--- 
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasImpl.java
 (original)
+++ 
uima/uimaj/trunk/uimaj-core/src/main/java/org/apache/uima/jcas/impl/JCasImpl.java
 Fri Apr 25 16:09:00 2014
@@ -1041,7 +1041,7 @@ public class JCasImpl extends AbstractCa
    * @see org.apache.uima.jcas.JCas#getJfsFromCaddr(int)
    */
   public TOP getJfsFromCaddr(int casAddr) {
-    return (TOP) sharedView.cAddr2Jfs.get(casAddr);
+    return (TOP) sharedView.cAddr2Jfs.getReserve(casAddr);
   }
 
   public void showJfsFromCaddrHistogram() {

Modified: 
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
URL: 
http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java?rev=1590071&r1=1590070&r2=1590071&view=diff
==============================================================================
--- 
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
 (original)
+++ 
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
 Fri Apr 25 16:09:00 2014
@@ -35,7 +35,7 @@ public class MultiThreadUtils extends Te
   public final static int PROCESSORS = 
Runtime.getRuntime().availableProcessors();
   
   public static interface Run2isb {
-    public void call(int i, int r, StringBuilder sb) throws Exception;
+    public void call(int threadNumber, int repeatNumber, StringBuilder sb) 
throws Exception;
   }
   
   // needed because

Modified: 
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
URL: 
http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java?rev=1590071&r1=1590070&r2=1590071&view=diff
==============================================================================
--- 
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
 (original)
+++ 
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/jcas/impl/JCasHashMapTest.java
 Fri Apr 25 16:09:00 2014
@@ -23,14 +23,25 @@ import java.util.Random;
 
 import junit.framework.TestCase;
 
+import org.apache.uima.cas.Type;
+import org.apache.uima.cas.impl.FeatureStructureImpl;
+import org.apache.uima.internal.util.MultiThreadUtils;
+import org.apache.uima.jcas.JCas;
 import org.apache.uima.jcas.cas.TOP;
+import org.apache.uima.jcas.cas.TOP_Type;
 
 /**
  * 
  *
  */
 public class JCasHashMapTest extends TestCase {
+  static private class FakeTopType extends TOP_Type {
+    public FakeTopType() {
+      super();
+    }    
+  }
   
+  static final TOP_Type NULL_TOP_TYPE_INSTANCE = new FakeTopType(); 
   static final int SIZE = 20000;  // set > 2 million for cache avoidance 
timing tests
   static final long SEED = 12345;
   static Random r = new Random(SEED);
@@ -63,6 +74,106 @@ public class JCasHashMapTest extends Tes
 
   }
   
+  public void testMultiThread() throws Exception {
+    final Random random = new Random();
+    int numberOfThreads = MultiThreadUtils.PROCESSORS;    
+    System.out.format("test JCasHashMap with %d threads", numberOfThreads);
+    
+    final JCasHashMap m = new JCasHashMap(200, true); // true = do use cache 
+
+    MultiThreadUtils.Run2isb run2isb = new MultiThreadUtils.Run2isb() {
+      
+      public void call(int threadNumber, int repeatNumber, StringBuilder sb) {
+        for (int k = 0; k < 4; k++) {
+          for (int i = 0; i < SIZE / 4; i++) {
+            final int key = addrs[random.nextInt(SIZE / 16)];
+            FeatureStructureImpl fs = m.get(key);
+            if (null == fs) {
+              m.put(new TOP(key, NULL_TOP_TYPE_INSTANCE));
+            }
+          }
+          try {
+            Thread.sleep(0, random.nextInt(1000));
+          } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+//        System.out.println(sb.toString());
+      }
+    };  
+    MultiThreadUtils.tstMultiThread("JCasHashMapTest",  numberOfThreads,  10, 
run2isb);
+  }
+
+  /**
+   * Create situation
+   *   make a set of indexed fs instances, no JCas
+   *   on multiple threads, simultaneously, attempt to get the jcas cover 
object for this
+   *     one getReserve should succeed, but reserve, and the others should 
"wait".
+   *     then put
+   *     then the others should "wakeup" and return the same instance 
+   *   
+   * @throws Exception
+   */
+  public void testMultiThreadCollide() throws Exception {
+    int numberOfThreads = MultiThreadUtils.PROCESSORS;
+    if (numberOfThreads < 2) {
+      return;
+    }
+    System.out.format("test JCasHashMap collide with %d threads", 
numberOfThreads);
+    final Random r = new Random();
+    final JCasHashMap m = new JCasHashMap(200, true); // true = do use cache 
+    final int hashKey = 15;
+    final TOP fs = new TOP(hashKey, NULL_TOP_TYPE_INSTANCE);
+    
+    final Thread[] threads = new Thread[numberOfThreads];
+    final FeatureStructureImpl[] found = new 
FeatureStructureImpl[numberOfThreads];
+    
+    for (int i = 0; i < numberOfThreads; i++) {
+      final int finalI = i;
+      threads[i] = new Thread(new Runnable() {
+        public void run() {
+          try {
+            Thread.sleep(r.nextInt(5));
+          } catch (InterruptedException e) {
+          }
+           found[finalI] = m.getReserve(hashKey);
+        }
+      });
+      threads[i].start();
+    }
+    Thread.sleep(100); 
+    // verify that one thread finished, others are waiting
+    int numberWaiting = 0;
+    int threadFinished = -1;
+    for (int i = 0; i < numberOfThreads; i++) {
+      if (threads[i].isAlive()) {
+        numberWaiting ++;
+      } else {
+        threadFinished = i;
+      }
+    }
+    
+    assertEquals(numberOfThreads - 1, numberWaiting);
+    m.put(fs);
+    found[threadFinished] = fs;
+    Thread.sleep(10);  
+    
+    numberWaiting = 0;
+    for (int i = 0; i < numberOfThreads; i++) {
+      if (threads[i].isAlive()) {
+        numberWaiting ++;
+      }
+    }
+    assertEquals(0, numberWaiting);
+    System.out.format("JCasHashMapTest collide,  found = %s%n", 
intList(found));
+    for (FeatureStructureImpl f : found) {
+      assertTrue(f == fs);
+    }
+  }
+
+
+  
 //  private void arun2(int n) {
 //    JCasHashMap2 m = new JCasHashMap2(200, true); 
 //    assertTrue(m.size() == 0);
@@ -72,7 +183,7 @@ public class JCasHashMapTest extends Tes
 //    
 //    long start = System.currentTimeMillis();
 //    for (int i = 0; i < n; i++) {
-//      TOP fs = new TOP(7 * i, null);
+//      TOP fs = new TOP(7 * i, NULL_TOP_TYPE_INSTANCE);
 //      FeatureStructureImpl v = m.get(fs.getAddress());
 //      if (null == v) {
 //        m.putAtLastProbeAddr(fs);
@@ -91,7 +202,7 @@ public class JCasHashMapTest extends Tes
     long start = System.currentTimeMillis();
     for (int i = 0; i < n; i++) {
       final int key = addrs[i];
-      TOP fs = new TOP(key, null);
+      TOP fs = new TOP(key, NULL_TOP_TYPE_INSTANCE);
 //      FeatureStructureImpl v = m.get(fs.getAddress());
 //      if (null == v) {
 //        m.get(7 * i);
@@ -109,7 +220,7 @@ public class JCasHashMapTest extends Tes
     
     for (int i = 0; i < n; i++) {
       final int key = addrs[i];
-      TOP fs = new TOP(key, null);
+      TOP fs = new TOP(key, NULL_TOP_TYPE_INSTANCE);
 //      FeatureStructureImpl v = m.get(fs.getAddress());
 //      if (null == v) {
 //        m.get(7 * i);
@@ -130,43 +241,77 @@ public class JCasHashMapTest extends Tes
   }
   
   public void testGrowth() {
+    System.out.println("JCasHashMapTest growth");
+    int cores = Runtime.getRuntime().availableProcessors();
     double loadfactor = .6;
-    JCasHashMap m = new JCasHashMap(64, true); // true = do use cache 
+    int sub_capacity = 64;
+    int agg_capacity = cores * sub_capacity;
+    JCasHashMap m = new JCasHashMap(agg_capacity, true); // true = do use 
cache 
     assertTrue(m.size() == 0);
      
-    int switchpoint = (int)Math.floor(64 * loadfactor);
+    int switchpoint = (int)Math.floor(agg_capacity * loadfactor);
     fill(switchpoint, m);
-    assertTrue(m.getbitsMask() == 63);
+    assertTrue(checkSubsCapacity(m, 64, 128));
+    System.out.println("JCasHashMapTest growth - adding 1 past switchpoint");
     m.put(new TOP(addrs[switchpoint + 1], null));
-    assertTrue(m.getbitsMask() == 127);
+    assertTrue(checkSubsCapacity(m, 64, 128));
     
     m.clear();
-    assertTrue(m.getbitsMask() == 127);
+    assertTrue(checkSubsCapacity(m, 64, 128));
+
 
     fill(switchpoint, m);
-    assertTrue(m.getbitsMask() == 127);
+    assertTrue(checkSubsCapacity(m, 64, 128));
     m.put(new TOP(addrs[switchpoint + 1], null));
-    assertTrue(m.getbitsMask() == 127);
+    assertTrue(checkSubsCapacity(m, 64, 128));
 
     m.clear();  // size is above switchpoint, so no shrinkage
-    assertTrue(m.getbitsMask() == 127);
+    assertTrue(checkSubsCapacity(m, 64, 128));
     m.clear();  // size is 0, so first time shrinkage a possibility
-    assertTrue(m.getbitsMask() == 127);  // but we don't shrink on first time
+    assertTrue(checkSubsCapacity(m, 64, 128)); // but we don't shrink on first 
time
     m.clear(); 
-    assertTrue(m.getbitsMask() == 63);  // but we do on second time
+    assertTrue(checkSubsCapacity(m, 64, 64));  // but we do on second time
     m.clear(); 
-    assertTrue(m.getbitsMask() == 63);  
+    assertTrue(checkSubsCapacity(m, 64, 64));
     m.clear(); 
-    assertTrue(m.getbitsMask() == 63);  // don't shrink below minimum
-
-    
+    assertTrue(checkSubsCapacity(m, 64, 64));  // don't shrink below minimum
   }
 
+  private boolean checkSubsCapacity(JCasHashMap m, int v, int v2) {
+    int[] caps = m.getCapacities();
+    for (int i : caps) {
+      if (i == v || i == v2 ) {
+        continue;
+      }
+      System.err.format("JCasHashMapTest: expected %d or %d, but got %s%n", v, 
v2, intList(caps));
+      return false;
+    }
+    System.out.format("JCasHashMapTest: capacities are: %s%n", intList(caps));
+    return true;
+  }
+  
+  private String intList(int[] a) {
+    StringBuilder sb = new StringBuilder();
+    for (int i : a) {
+      sb.append(i).append(", ");
+    }
+    return sb.toString();
+  }
+  
+  private String intList(FeatureStructureImpl[] a) {
+    StringBuilder sb = new StringBuilder();
+    for (FeatureStructureImpl i : a) {
+      sb.append(i == null ? "null" : i.getAddress()).append(", ");
+    }
+    return sb.toString();
+  }
+  
   private void fill (int n, JCasHashMap m) {
     for (int i = 0; i < n; i++) {
       final int key = addrs[i];
-      TOP fs = new TOP(key, null);
+      TOP fs = new TOP(key, NULL_TOP_TYPE_INSTANCE);
       m.put(fs);
+//      System.out.format("JCasHashMapTest fill %s%n",  
intList(m.getCapacities()));
     }
   }
 }


Reply via email to