HBASE-15872 Split TestWALProcedureStore

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0acd49dd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0acd49dd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0acd49dd

Branch: refs/heads/branch-1
Commit: 0acd49ddfe179b2a217e3c6720ada38d48b1cd46
Parents: 108a130
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Fri May 20 13:07:12 2016 -0700
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Fri May 20 13:18:12 2016 -0700

----------------------------------------------------------------------
 .../procedure2/ProcedureTestingUtility.java     |  77 ++++++++++
 .../store/wal/TestStressWALProcedureStore.java  | 134 ++++++++++++++++++
 .../store/wal/TestWALProcedureStore.java        | 139 +------------------
 3 files changed, 212 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0acd49dd/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 45ab4bd..9d41473 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.procedure2;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import 
org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
@@ -241,4 +244,78 @@ public class ProcedureTestingUtility {
     @Override
     protected void deserializeStateData(final InputStream stream) throws 
IOException { }
   }
+
+  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
+    private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
+    private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
+
+    private Set<Long> procIds;
+    private long maxProcId = 0;
+
+    public LoadCounter() {
+      this(null);
+    }
+
+    public LoadCounter(final Set<Long> procIds) {
+      this.procIds = procIds;
+    }
+
+    public void reset() {
+      reset(null);
+    }
+
+    public void reset(final Set<Long> procIds) {
+      corrupted.clear();
+      loaded.clear();
+      this.procIds = procIds;
+      this.maxProcId = 0;
+    }
+
+    public long getMaxProcId() {
+      return maxProcId;
+    }
+
+    public ArrayList<Procedure> getLoaded() {
+      return loaded;
+    }
+
+    public int getLoadedCount() {
+      return loaded.size();
+    }
+
+    public ArrayList<Procedure> getCorrupted() {
+      return corrupted;
+    }
+
+    public int getCorruptedCount() {
+      return corrupted.size();
+    }
+
+    @Override
+    public void setMaxProcId(long maxProcId) {
+      maxProcId = maxProcId;
+    }
+
+    @Override
+    public void load(ProcedureIterator procIter) throws IOException {
+      while (procIter.hasNext()) {
+        Procedure proc = procIter.nextAsProcedure();
+        LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
+        if (procIds != null) {
+          assertTrue("procId=" + proc.getProcId() + " unexpected",
+                     procIds.contains(proc.getProcId()));
+        }
+        loaded.add(proc);
+      }
+    }
+
+    @Override
+    public void handleCorrupted(ProcedureIterator procIter) throws IOException 
{
+      while (procIter.hasNext()) {
+        Procedure proc = procIter.nextAsProcedure();
+        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
+        corrupted.add(proc);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0acd49dd/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
new file mode 100644
index 0000000..1c1af79
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestStressWALProcedureStore {
+  private static final Log LOG = 
LogFactory.getLog(TestWALProcedureStore.class);
+
+  private static final int PROCEDURE_STORE_SLOTS = 8;
+
+  private WALProcedureStore procStore;
+
+  private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path testDir;
+  private Path logDir;
+
+  private void setupConfiguration(Configuration conf) {
+    conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
+    conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 5000);
+    conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    setupConfiguration(htu.getConfiguration());
+
+    testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    assertTrue(testDir.depth() > 1);
+
+    logDir = new Path(testDir, "proc-logs");
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), 
fs, logDir);
+    procStore.start(PROCEDURE_STORE_SLOTS);
+    procStore.recoverLease();
+
+    LoadCounter loader = new LoadCounter();
+    procStore.load(loader);
+    assertEquals(0, loader.getMaxProcId());
+    assertEquals(0, loader.getLoadedCount());
+    assertEquals(0, loader.getCorruptedCount());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    procStore.stop(false);
+    fs.delete(logDir, true);
+  }
+
+  @Test
+  public void testInsertUpdateDelete() throws Exception {
+    final long LAST_PROC_ID = 19999;
+    final Thread[] thread = new Thread[PROCEDURE_STORE_SLOTS];
+    final AtomicLong procCounter = new 
AtomicLong((long)Math.round(Math.random() * 100));
+    for (int i = 0; i < thread.length; ++i) {
+      thread[i] = new Thread() {
+        @Override
+        public void run() {
+          Random rand = new Random();
+          TestProcedure proc;
+          do {
+            proc = new TestProcedure(procCounter.addAndGet(1));
+            // Insert
+            procStore.insert(proc, null);
+            // Update
+            for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
+              try { Thread.sleep(0, rand.nextInt(15)); } catch 
(InterruptedException e) {}
+              procStore.update(proc);
+            }
+            // Delete
+            procStore.delete(proc.getProcId());
+          } while (proc.getProcId() < LAST_PROC_ID);
+        }
+      };
+      thread[i].start();
+    }
+
+    for (int i = 0; i < thread.length; ++i) {
+      thread[i].join();
+    }
+
+    procStore.getStoreTracker().dump();
+    assertTrue(procCounter.get() >= LAST_PROC_ID);
+    assertTrue(procStore.getStoreTracker().isEmpty());
+    assertEquals(1, procStore.getActiveLogs().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0acd49dd/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index e665cce..e97dd2a 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -22,15 +22,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
 import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
 import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@@ -417,66 +414,6 @@ public class TestWALProcedureStore {
   }
 
   @Test
-  public void testInsertUpdateDelete() throws Exception {
-    final int NTHREAD = 2;
-
-    procStore.stop(false);
-    fs.delete(logDir, true);
-
-    org.apache.hadoop.conf.Configuration conf =
-      new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
-    conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
-    conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
-    conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
-
-    fs.mkdirs(logDir);
-    procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
-    procStore.start(NTHREAD);
-    procStore.recoverLease();
-
-    LoadCounter loader = new LoadCounter();
-    procStore.load(loader);
-    assertEquals(0, loader.getMaxProcId());
-    assertEquals(0, loader.getLoadedCount());
-    assertEquals(0, loader.getCorruptedCount());
-
-    final long LAST_PROC_ID = 9999;
-    final Thread[] thread = new Thread[NTHREAD];
-    final AtomicLong procCounter = new 
AtomicLong((long)Math.round(Math.random() * 100));
-    for (int i = 0; i < thread.length; ++i) {
-      thread[i] = new Thread() {
-        @Override
-        public void run() {
-          Random rand = new Random();
-          TestProcedure proc;
-          do {
-            proc = new TestProcedure(procCounter.addAndGet(1));
-            // Insert
-            procStore.insert(proc, null);
-            // Update
-            for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
-              try { Thread.sleep(0, rand.nextInt(15)); } catch 
(InterruptedException e) {}
-              procStore.update(proc);
-            }
-            // Delete
-            procStore.delete(proc.getProcId());
-          } while (proc.getProcId() < LAST_PROC_ID);
-        }
-      };
-      thread[i].start();
-    }
-
-    for (int i = 0; i < thread.length; ++i) {
-      thread[i].join();
-    }
-
-    procStore.getStoreTracker().dump();
-    assertTrue(procCounter.get() >= LAST_PROC_ID);
-    assertTrue(procStore.getStoreTracker().isEmpty());
-    assertEquals(1, procStore.getActiveLogs().size());
-  }
-
-  @Test
   public void testRollAndRemove() throws IOException {
     // Insert something in the log
     Procedure proc1 = new TestSequentialProcedure();
@@ -575,78 +512,4 @@ public class TestWALProcedureStore {
       }
     }
   }
-
-  private class LoadCounter implements ProcedureStore.ProcedureLoader {
-    private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
-    private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
-
-    private Set<Long> procIds;
-    private long maxProcId = 0;
-
-    public LoadCounter() {
-      this(null);
-    }
-
-    public LoadCounter(final Set<Long> procIds) {
-      this.procIds = procIds;
-    }
-
-    public void reset() {
-      reset(null);
-    }
-
-    public void reset(final Set<Long> procIds) {
-      corrupted.clear();
-      loaded.clear();
-      this.procIds = procIds;
-      this.maxProcId = 0;
-    }
-
-    public long getMaxProcId() {
-      return maxProcId;
-    }
-
-    public ArrayList<Procedure> getLoaded() {
-      return loaded;
-    }
-
-    public int getLoadedCount() {
-      return loaded.size();
-    }
-
-    public ArrayList<Procedure> getCorrupted() {
-      return corrupted;
-    }
-
-    public int getCorruptedCount() {
-      return corrupted.size();
-    }
-
-    @Override
-    public void setMaxProcId(long maxProcId) {
-      maxProcId = maxProcId;
-    }
-
-    @Override
-    public void load(ProcedureIterator procIter) throws IOException {
-      while (procIter.hasNext()) {
-        Procedure proc = procIter.nextAsProcedure();
-        LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
-        if (procIds != null) {
-          assertTrue("procId=" + proc.getProcId() + " unexpected",
-                     procIds.contains(proc.getProcId()));
-        }
-        loaded.add(proc);
-      }
-    }
-
-    @Override
-    public void handleCorrupted(ProcedureIterator procIter) throws IOException 
{
-      while (procIter.hasNext()) {
-        Procedure proc = procIter.nextAsProcedure();
-        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
-        corrupted.add(proc);
-      }
-    }
-  }
 }

Reply via email to