HBASE-13202 Procedure v2 - core framework (addendum)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d75326a7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d75326a7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d75326a7 Branch: refs/heads/hbase-12439 Commit: d75326a7974881a41993e210b9c5b7d4b0fe5b8b Parents: 4f15144 Author: Matteo Bertozzi <[email protected]> Authored: Wed Apr 15 09:39:25 2015 +0100 Committer: Matteo Bertozzi <[email protected]> Committed: Wed Apr 15 09:50:47 2015 +0100 ---------------------------------------------------------------------- .../procedure2/ProcedureFairRunQueues.java | 1 + .../procedure2/store/ProcedureStoreTracker.java | 12 ++++++++-- .../procedure2/store/wal/WALProcedureStore.java | 6 ++--- .../store/TestProcedureStoreTracker.java | 25 ++++++++++++++++++++ 4 files changed, 39 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d75326a7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java index 03d007a..242ae86 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java @@ -95,6 +95,7 @@ public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues. public void clear() { lock.lock(); try { + currentQuantum = 0; current = null; objMap.clear(); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/d75326a7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 4e4653a..a4711f1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -195,11 +195,12 @@ public class ProcedureStoreTracker { // Grow/Merge Helpers // ======================================================================== public boolean canGrow(final long procId) { - return (procId - start) < MAX_NODE_SIZE; + return Math.abs(procId - start) < MAX_NODE_SIZE; } public boolean canMerge(final BitSetNode rightNode) { - return (start + rightNode.getEnd()) < MAX_NODE_SIZE; + assert start < rightNode.getEnd(); + return (rightNode.getEnd() - start) < MAX_NODE_SIZE; } public void grow(final long procId) { @@ -258,6 +259,11 @@ public class ProcedureStoreTracker { } } + @Override + public String toString() { + return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; + } + // ======================================================================== // Min/Max Helpers // ======================================================================== @@ -377,6 +383,7 @@ public class ProcedureStoreTracker { @InterfaceAudience.Private public void setDeleted(final long procId, final boolean isDeleted) { BitSetNode node = getOrCreateNode(procId); + assert node.contains(procId) : "expected procId in the node"; node.updateState(procId, isDeleted); } @@ -507,6 +514,7 @@ public class ProcedureStoreTracker { } private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { + assert leftNode.getStart() < rightNode.getStart(); leftNode.merge(rightNode); map.remove(rightNode.getStart()); return leftNode; http://git-wip-us.apache.org/repos/asf/hbase/blob/d75326a7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 13f7bfa..09d2f7a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -389,7 +389,7 @@ public class WALProcedureStore implements ProcedureStore { } private long pushData(final ByteSlot slot) { - assert !logs.isEmpty() : "recoverLease() must be called before inserting data"; + assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data"; long logId = -1; lock.lock(); @@ -677,7 +677,7 @@ public class WALProcedureStore implements ProcedureStore { try { log.readTracker(storeTracker); } catch (IOException e) { - LOG.error("Unable to read tracker for " + log, e); + LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); // try the next one... storeTracker.clear(); storeTracker.setPartialFlag(true); @@ -718,4 +718,4 @@ public class WALProcedureStore implements ProcedureStore { } return log; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d75326a7/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 0669549..be759dc 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2.store; import java.io.InputStream; import java.io.OutputStream; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -165,4 +166,28 @@ public class TestProcedureStoreTracker { tracker.delete(procs[5].getProcId()); assertTrue(tracker.isEmpty()); } + + @Test + public void testRandLoad() { + final int NPROCEDURES = 2500; + final int NRUNS = 5000; + + final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + + Random rand = new Random(1); + for (int i = 0; i < NRUNS; ++i) { + assertTrue(tracker.isEmpty()); + + int count = 0; + while (count < NPROCEDURES) { + long procId = rand.nextLong(); + if (procId < 1) continue; + + tracker.setDeleted(procId, i % 2 == 0); + count++; + } + + tracker.clear(); + } + } }
