HBASE-14712 Increase MasterProcWALs clean up granularity

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b80c803
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b80c803
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b80c803

Branch: refs/heads/hbase-12439
Commit: 7b80c803b70008a1e5cf593fc302535b451400fe
Parents: 0af651b
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Mon Nov 9 09:33:05 2015 -0800
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Mon Nov 9 09:34:51 2015 -0800

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java | 56 +++++++++++--
 .../procedure2/store/wal/ProcedureWALFile.java  | 15 ++++
 .../store/wal/ProcedureWALFormatReader.java     | 23 +++++-
 .../procedure2/store/wal/WALProcedureStore.java | 87 ++++++++++++++++----
 .../store/TestProcedureStoreTracker.java        | 31 +++++++
 .../store/wal/TestWALProcedureStore.java        | 65 +++++++++++++++
 6 files changed, 255 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7b80c803/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 07fb026..8516f61 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -44,13 +44,16 @@ public class ProcedureStoreTracker {
   private boolean keepDeletes = false;
   private boolean partial = false;
 
+  private long minUpdatedProcId = Long.MAX_VALUE;
+  private long maxUpdatedProcId = Long.MIN_VALUE;
+
   public enum DeleteState { YES, NO, MAYBE }
 
   public static class BitSetNode {
     private final static long WORD_MASK = 0xffffffffffffffffL;
     private final static int ADDRESS_BITS_PER_WORD = 6;
     private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
-    private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
+    private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
 
     private final boolean partial;
     private long[] updated;
@@ -81,7 +84,7 @@ public class ProcedureStoreTracker {
     public BitSetNode(final long procId, final boolean partial) {
       start = alignDown(procId);
 
-      int count = 2;
+      int count = 1;
       updated = new long[count];
       deleted = new long[count];
       for (int i = 0; i < count; ++i) {
@@ -141,8 +144,7 @@ public class ProcedureStoreTracker {
     public boolean isUpdated() {
       // TODO: cache the value
       for (int i = 0; i < updated.length; ++i) {
-        long deleteMask = ~deleted[i];
-        if ((updated[i] & deleteMask) != (WORD_MASK & deleteMask)) {
+        if ((updated[i] | deleted[i]) != WORD_MASK) {
           return false;
         }
       }
@@ -171,6 +173,16 @@ public class ProcedureStoreTracker {
       }
     }
 
+    public void unsetPartialFlag() {
+      for (int i = 0; i < updated.length; ++i) {
+        for (int j = 0; j < BITS_PER_WORD; ++j) {
+          if ((updated[i] & (1L << j)) == 0) {
+            deleted[i] |= (1L << j);
+          }
+        }
+      }
+    }
+
     public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
       ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
         ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
@@ -360,6 +372,7 @@ public class ProcedureStoreTracker {
   public void insert(long procId) {
     BitSetNode node = getOrCreateNode(procId);
     node.update(procId);
+    trackProcIds(procId);
   }
 
   public void update(long procId) {
@@ -369,6 +382,7 @@ public class ProcedureStoreTracker {
     BitSetNode node = entry.getValue();
     assert node.contains(procId);
     node.update(procId);
+    trackProcIds(procId);
   }
 
   public void delete(long procId) {
@@ -383,6 +397,21 @@ public class ProcedureStoreTracker {
       // TODO: RESET if (map.size() == 1)
       map.remove(entry.getKey());
     }
+
+    trackProcIds(procId);
+  }
+
+  private void trackProcIds(long procId) {
+    minUpdatedProcId = Math.min(minUpdatedProcId, procId);
+    maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
+  }
+
+  public long getUpdatedMinProcId() {
+    return minUpdatedProcId;
+  }
+
+  public long getUpdatedMaxProcId() {
+    return maxUpdatedProcId;
   }
 
   @InterfaceAudience.Private
@@ -394,11 +423,12 @@ public class ProcedureStoreTracker {
 
   public void clear() {
     this.map.clear();
+    resetUpdates();
   }
 
   public DeleteState isDeleted(long procId) {
     Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
-    if (entry != null) {
+    if (entry != null && entry.getValue().contains(procId)) {
       BitSetNode node = entry.getValue();
       DeleteState state = node.isDeleted(procId);
       return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
@@ -426,6 +456,11 @@ public class ProcedureStoreTracker {
   }
 
   public void setPartialFlag(boolean isPartial) {
+    if (this.partial && !isPartial) {
+      for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+        entry.getValue().unsetPartialFlag();
+      }
+    }
     this.partial = isPartial;
   }
 
@@ -447,10 +482,17 @@ public class ProcedureStoreTracker {
     return true;
   }
 
+  public boolean isTracking(long minId, long maxId) {
+    // TODO: we can make it more precise, instead of looking just at the block
+    return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
+  }
+
   public void resetUpdates() {
     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
       entry.getValue().resetUpdates();
     }
+    minUpdatedProcId = Long.MAX_VALUE;
+    maxUpdatedProcId = Long.MIN_VALUE;
   }
 
   public void undeleteAll() {
@@ -527,6 +569,8 @@ public class ProcedureStoreTracker {
 
   public void dump() {
     System.out.println("map " + map.size());
+    System.out.println("isUpdated " + isUpdated());
+    System.out.println("isEmpty " + isEmpty());
     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
       entry.getValue().dump();
     }
@@ -550,4 +594,4 @@ public class ProcedureStoreTracker {
       map.put(node.getStart(), node);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b80c803/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 859b3cb..4f8a493 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -46,6 +46,8 @@ public class ProcedureWALFile implements 
Comparable<ProcedureWALFile> {
   private FileSystem fs;
   private Path logFile;
   private long startPos;
+  private long minProcId;
+  private long maxProcId;
 
   public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
     this.fs = fs;
@@ -127,6 +129,19 @@ public class ProcedureWALFile implements 
Comparable<ProcedureWALFile> {
     fs.delete(logFile, false);
   }
 
+  public void setProcIds(long minId, long maxId) {
+    this.minProcId = minId;
+    this.maxProcId = maxId;
+  }
+
+  public long getMinProcId() {
+    return minProcId;
+  }
+
+  public long getMaxProcId() {
+    return maxProcId;
+  }
+
   @Override
   public int compareTo(final ProcedureWALFile other) {
     long diff = header.getLogId() - other.header.getLogId();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b80c803/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 4ca75a7..958241c 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -150,8 +150,8 @@ public class ProcedureWALFormatReader {
       LOG.info("No active entry found in state log " + log + ". removing it");
       loader.removeLog(log);
     } else {
+      log.setProcIds(localProcedureMap.getMinProcId(), 
localProcedureMap.getMaxProcId());
       procedureMap.mergeTail(localProcedureMap);
-
       //if (hasFastStartSupport) {
         // TODO: Some procedure may be already runnables (see readInitEntry())
         //       (we can also check the "update map" in the log trackers)
@@ -321,6 +321,10 @@ public class ProcedureWALFormatReader {
     // pending unlinked children (root not present yet)
     private Entry childUnlinkedHead;
 
+    // Track ProcId range
+    private long minProcId = Long.MAX_VALUE;
+    private long maxProcId = Long.MIN_VALUE;
+
     public WalProcedureMap(int size) {
       procedureMap = new Entry[size];
       replayOrderHead = null;
@@ -330,6 +334,7 @@ public class ProcedureWALFormatReader {
     }
 
     public void add(ProcedureProtos.Procedure procProto) {
+      trackProcIds(procProto.getProcId());
       Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
       boolean isNew = entry.proto == null;
       entry.proto = procProto;
@@ -345,6 +350,7 @@ public class ProcedureWALFormatReader {
     }
 
     public boolean remove(long procId) {
+      trackProcIds(procId);
       Entry entry = removeFromMap(procId);
       if (entry != null) {
         unlinkFromReplayList(entry);
@@ -354,6 +360,19 @@ public class ProcedureWALFormatReader {
       return false;
     }
 
+    private void trackProcIds(long procId) {
+      minProcId = Math.min(minProcId, procId);
+      maxProcId = Math.max(maxProcId, procId);
+    }
+
+    public long getMinProcId() {
+      return minProcId;
+    }
+
+    public long getMaxProcId() {
+      return maxProcId;
+    }
+
     public boolean contains(long procId) {
       return getProcedure(procId) != null;
     }
@@ -370,6 +389,8 @@ public class ProcedureWALFormatReader {
       replayOrderTail = null;
       rootHead = null;
       childUnlinkedHead = null;
+      minProcId = Long.MAX_VALUE;
+      maxProcId = Long.MIN_VALUE;
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b80c803/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 8764ff0..a3115f8 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -100,6 +100,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   private final LinkedList<ProcedureWALFile> logs = new 
LinkedList<ProcedureWALFile>();
   private final ProcedureStoreTracker storeTracker = new 
ProcedureStoreTracker();
+  private final AtomicLong inactiveLogsMaxId = new AtomicLong(0);
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
   private final Condition slotCond = lock.newCondition();
@@ -225,6 +226,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return storeTracker;
   }
 
+  public LinkedList<ProcedureWALFile> getActiveLogs() {
+    return logs;
+  }
+
+  public Set<ProcedureWALFile> getCorruptedLogs() {
+    return corruptedLogs;
+  }
+
   @Override
   public void recoverLease() throws IOException {
     LOG.info("Starting WAL Procedure Store lease recovery");
@@ -386,7 +395,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     if (removeOldLogs) {
-      removeAllLogs(logId - 1);
+      setInactiveLogsMaxId(logId - 1);
     }
   }
 
@@ -426,7 +435,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
 
     if (removeOldLogs) {
-      removeAllLogs(logId);
+      setInactiveLogsMaxId(logId);
     }
   }
 
@@ -499,6 +508,18 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return syncException.get() != null;
   }
 
+  protected void periodicRoll() throws IOException {
+    long logId;
+    boolean removeOldLogs;
+    synchronized (storeTracker) {
+      logId = flushLogId;
+      removeOldLogs = storeTracker.isEmpty();
+    }
+    if (checkAndTryRoll() && removeOldLogs) {
+      setInactiveLogsMaxId(logId);
+    }
+  }
+
   private void syncLoop() throws Throwable {
     inSync.set(false);
     lock.lock();
@@ -507,6 +528,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
         try {
           // Wait until new data is available
           if (slotIndex == 0) {
+            removeInactiveLogs();
+
             if (LOG.isTraceEnabled()) {
               float rollTsSec = getMillisFromLastRoll() / 1000.0f;
               LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
@@ -516,8 +539,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
             waitCond.await(getMillisToNextPeriodicRoll(), 
TimeUnit.MILLISECONDS);
             if (slotIndex == 0) {
-              // no data.. probably a stop()
-              checkAndTryRoll();
+              // no data.. probably a stop() or a periodic roll
+              periodicRoll();
               continue;
             }
           }
@@ -724,7 +747,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
     try {
       if (stream != null) {
         try {
-          ProcedureWALFormat.writeTrailer(stream, storeTracker);
+          synchronized (storeTracker) {
+            ProcedureWALFile log = logs.getLast();
+            log.setProcIds(storeTracker.getUpdatedMinProcId(), 
storeTracker.getUpdatedMaxProcId());
+            ProcedureWALFormat.writeTrailer(stream, storeTracker);
+          }
         } catch (IOException e) {
           LOG.warn("Unable to write the trailer: " + e.getMessage());
         }
@@ -737,21 +764,51 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
     }
   }
 
-  private void removeAllLogs(long lastLogId) {
-    if (logs.size() <= 1) {
-      assert logs.size() == 1: "Expected at least one active log to be 
running.";
-      return;
+  // ==========================================================================
+  //  Log Files cleaner helpers
+  // ==========================================================================
+  private void setInactiveLogsMaxId(long logId) {
+    long expect = 0;
+    while (!inactiveLogsMaxId.compareAndSet(expect, logId)) {
+      expect = inactiveLogsMaxId.get();
+      if (expect >= logId) {
+        break;
+      }
     }
+  }
+
+  private void removeInactiveLogs() {
+    long lastLogId = inactiveLogsMaxId.get();
+    if (lastLogId != 0) {
+      removeAllLogs(lastLogId);
+      inactiveLogsMaxId.compareAndSet(lastLogId, 0);
+    }
+
+    // Verify if the ProcId of the first oldest is still active. if not remove 
the file.
+    while (logs.size() > 1) {
+      ProcedureWALFile log = logs.getFirst();
+      synchronized (storeTracker) {
+        if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) {
+          break;
+        }
+      }
+      removeLogFile(log);
+    }
+  }
+
+  private void removeAllLogs(long lastLogId) {
+    if (logs.size() <= 1) return;
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Remove all state logs with ID less than " + lastLogId);
     }
-    do {
+    while (logs.size() > 1) {
       ProcedureWALFile log = logs.getFirst();
       if (lastLogId < log.getLogId()) {
         break;
       }
       removeLogFile(log);
-    } while(!logs.isEmpty());
+    }
   }
 
   private boolean removeLogFile(final ProcedureWALFile log) {
@@ -761,6 +818,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
       log.removeFile();
       logs.remove(log);
+      LOG.info("Remove log: " + log);
+      LOG.info("Removed logs: " + logs);
+      if (logs.size() == 0) { LOG.error("Expected at least one log"); }
+      assert logs.size() > 0 : "expected at least one log";
     } catch (IOException e) {
       LOG.error("Unable to remove log: " + log, e);
       return false;
@@ -768,10 +829,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
     return true;
   }
 
-  public Set<ProcedureWALFile> getCorruptedLogs() {
-    return corruptedLogs;
-  }
-
   // ==========================================================================
   //  FileSystem Log Files helpers
   // ==========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b80c803/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 054da99..7d6eb1c 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2.store;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -124,6 +125,36 @@ public class TestProcedureStoreTracker {
     assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(579));
     assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(577));
     assertEquals(ProcedureStoreTracker.DeleteState.MAYBE, 
tracker.isDeleted(580));
+
+    tracker.setDeleted(579, true);
+    tracker.setPartialFlag(false);
+    assertTrue(tracker.isEmpty());
+  }
+
+  @Test
+  public void testIsTracking() {
+    long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}};
+    long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}};
+
+    ProcedureStoreTracker tracker = new ProcedureStoreTracker();
+    for (int i = 0; i < procIds.length; ++i) {
+      long[] seq = procIds[i];
+      tracker.insert(seq[0]);
+      tracker.insert(seq[1]);
+    }
+
+    for (int i = 0; i < procIds.length; ++i) {
+      long[] check = checkIds[i];
+      long[] seq = procIds[i];
+      assertTrue(tracker.isTracking(seq[0], seq[1]));
+      assertTrue(tracker.isTracking(check[0], check[1]));
+      tracker.delete(seq[0]);
+      tracker.delete(seq[1]);
+      assertFalse(tracker.isTracking(seq[0], seq[1]));
+      assertFalse(tracker.isTracking(check[0], check[1]));
+    }
+
+    assertTrue(tracker.isEmpty());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b80c803/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 19396aa..6e69ca1 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -27,6 +27,8 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -97,6 +99,15 @@ public class TestWALProcedureStore {
   }
 
   @Test
+  public void testEmptyRoll() throws Exception {
+    for (int i = 0; i < 10; ++i) {
+      procStore.periodicRoll();
+    }
+    FileStatus[] status = fs.listStatus(logDir);
+    assertEquals(1, status.length);
+  }
+
+  @Test
   public void testEmptyLogLoad() throws Exception {
     LoadCounter loader = new LoadCounter();
     storeRestart(loader);
@@ -354,6 +365,60 @@ public class TestWALProcedureStore {
     });
   }
 
+  @Test
+  public void testInsertUpdateDelete() throws Exception {
+    final int NTHREAD = 2;
+
+    procStore.stop(false);
+    fs.delete(logDir, true);
+
+    org.apache.hadoop.conf.Configuration conf =
+      new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
+    conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
+    conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
+    conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
+
+    fs.mkdirs(logDir);
+    procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+    procStore.start(NTHREAD);
+    procStore.recoverLease();
+
+    final long LAST_PROC_ID = 9999;
+    final Thread[] thread = new Thread[NTHREAD];
+    final AtomicLong procCounter = new 
AtomicLong((long)Math.round(Math.random() * 100));
+    for (int i = 0; i < thread.length; ++i) {
+      thread[i] = new Thread() {
+        @Override
+        public void run() {
+          Random rand = new Random();
+          TestProcedure proc;
+          do {
+            proc = new TestProcedure(procCounter.addAndGet(1));
+            // Insert
+            procStore.insert(proc, null);
+            // Update
+            for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
+              try { Thread.sleep(0, rand.nextInt(15)); } catch 
(InterruptedException e) {}
+              procStore.update(proc);
+            }
+            // Delete
+            procStore.delete(proc.getProcId());
+          } while (proc.getProcId() < LAST_PROC_ID);
+        }
+      };
+      thread[i].start();
+    }
+
+    for (int i = 0; i < thread.length; ++i) {
+      thread[i].join();
+    }
+
+    procStore.getStoreTracker().dump();
+    assertTrue(procCounter.get() >= LAST_PROC_ID);
+    assertTrue(procStore.getStoreTracker().isEmpty());
+    assertEquals(1, procStore.getActiveLogs().size());
+  }
+
   private void corruptLog(final FileStatus logFile, final long dropBytes)
       throws IOException {
     assertTrue(logFile.getLen() > dropBytes);

Reply via email to