This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 0fc588dfd0949e8b9d96eaee3ce2a8d6d7cef66a Author: Duo Zhang <zhang...@apache.org> AuthorDate: Thu Sep 29 10:08:02 2022 +0800 HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803) Signed-off-by: Xin Sun <ddu...@gmail.com> --- .../hbase/procedure2/LockedResourceType.java | 3 +- .../master/procedure/GlobalProcedureInterface.java | 15 ++- .../hadoop/hbase/master/procedure/GlobalQueue.java | 21 ++-- .../master/procedure/MasterProcedureScheduler.java | 119 ++++++++++++++++++++- .../hbase/master/procedure/SchemaLocking.java | 18 +++- .../procedure/TestMasterProcedureScheduler.java | 48 +++++++++ 6 files changed, 202 insertions(+), 22 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java index 12f899d7565..40141017009 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java @@ -26,5 +26,6 @@ public enum LockedResourceType { TABLE, REGION, PEER, - META + META, + GLOBAL } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java similarity index 82% copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java index 12f899d7565..1ef168abfd8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java @@ -15,16 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.procedure2; +package org.apache.hadoop.hbase.master.procedure; import org.apache.yetus.audience.InterfaceAudience; +/** + * Procedure interface for global operations, such as migration. + */ @InterfaceAudience.Private -public enum LockedResourceType { - SERVER, - NAMESPACE, - TABLE, - REGION, - PEER, - META +public interface GlobalProcedureInterface { + + String getGlobalId(); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java similarity index 69% copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java index 12f899d7565..1633dc4856e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java @@ -15,16 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.procedure2; +package org.apache.hadoop.hbase.master.procedure; +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public enum LockedResourceType { - SERVER, - NAMESPACE, - TABLE, - REGION, - PEER, - META +public class GlobalQueue extends Queue<String> { + + public GlobalQueue(String globalId, LockStatus lockStatus) { + super(globalId, lockStatus); + } + + @Override + boolean requireExclusiveLock(Procedure<?> proc) { + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 866f2f6f403..fbf0eb8abf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; @@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { (n, k) -> n.compareKey((String) k); private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = (n, k) -> n.compareKey((TableName) k); + private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((String) k); private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); private final FairQueue<String> peerRunQueue = new FairQueue<>(); private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); + private final FairQueue<String> globalRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; private TableQueue tableMap = null; private PeerQueue peerMap = null; private MetaQueue metaMap = null; + private GlobalQueue globalMap = null; private final SchemaLocking locking; @@ -128,6 +133,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); } else if (isPeerProcedure(proc)) { doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); + } else if (isGlobalProcedure(proc)) { + doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront); } else { // TODO: at the moment we only have Table and Server procedures // if you are implementing a non-table/non-server procedure, you have two options: create @@ -163,14 +170,19 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override protected boolean queueHasRunnables() { - return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() - || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); + return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables() + || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() + || peerRunQueue.hasRunnables(); } @Override protected Procedure dequeue() { - // meta procedure is always the first priority - Procedure<?> pollResult = doPoll(metaRunQueue); + // pull global first + Procedure<?> pollResult = doPoll(globalRunQueue); + // then meta procedure + if (pollResult == null) { + pollResult = doPoll(metaRunQueue); + } // For now, let server handling have precedence over table handling; presumption is that it // is more important handling crashed servers than it is running the // enabling/disabling tables, etc. @@ -268,6 +280,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); peerMap = null; + // Remove Meta + clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR); + metaMap = null; + + // Remove Global + clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR); + globalMap = null; + assert size() == 0 : "expected queue size to be 0, got " + size(); } @@ -300,6 +320,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { count += queueSize(tableMap); count += queueSize(peerMap); count += queueSize(metaMap); + count += queueSize(globalMap); return count; } @@ -502,6 +523,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return proc instanceof MetaProcedureInterface; } + // ============================================================================ + // Global Queue Lookup Helpers + // ============================================================================ + private GlobalQueue getGlobalQueue(String globalId) { + GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); + if (node != null) { + return node; + } + node = new GlobalQueue(globalId, locking.getGlobalLock(globalId)); + globalMap = AvlTree.insert(globalMap, node); + return node; + } + + private void removeGlobalQueue(String globalId) { + globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); + locking.removeGlobalLock(globalId); + } + + private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) { + schedLock(); + try { + GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); + if (queue == null) { + return; + } + + final LockAndQueue lock = locking.getGlobalLock(globalId); + if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(globalRunQueue, queue, + () -> "clean up global queue after " + procedure + " completed"); + removeGlobalQueue(globalId); + } + } finally { + schedUnlock(); + } + } + + private static boolean isGlobalProcedure(Procedure<?> proc) { + return proc instanceof GlobalProcedureInterface; + } + + private static String getGlobalId(Procedure<?> proc) { + return ((GlobalProcedureInterface) proc).getGlobalId(); + } + // ============================================================================ // Table Locking Helpers // ============================================================================ @@ -1006,6 +1072,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } + // ============================================================================ + // Global Locking Helpers + // ============================================================================ + /** + * Try to acquire the share lock on global. + * @see #wakeGlobalExclusiveLock(Procedure, String) + * @param procedure the procedure trying to acquire the lock + * @return true if the procedure has to wait for global to be available + */ + public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) { + schedLock(); + try { + final LockAndQueue lock = locking.getGlobalLock(globalId); + if (lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId), + () -> procedure + " held shared lock"); + return false; + } + waitProcedure(lock, procedure); + logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING); + return true; + } finally { + schedUnlock(); + } + } + + /** + * Wake the procedures waiting for global. + * @see #waitGlobalExclusiveLock(Procedure, String) + * @param procedure the procedure releasing the lock + */ + public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) { + schedLock(); + try { + final LockAndQueue lock = locking.getGlobalLock(globalId); + lock.releaseExclusiveLock(procedure); + addToRunQueue(globalRunQueue, getGlobalQueue(globalId), + () -> procedure + " released shared lock"); + int waitingCount = wakeWaitingProcedures(lock); + wakePollIfNeeded(waitingCount); + } finally { + schedUnlock(); + } + } + /** * For debugging. Expensive. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 13419ac455c..853d13b0c93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -53,6 +53,7 @@ class SchemaLocking { // Single map for all regions irrespective of tables. Key is encoded region name. private final Map<String, LockAndQueue> regionLocks = new HashMap<>(); private final Map<String, LockAndQueue> peerLocks = new HashMap<>(); + private final Map<String, LockAndQueue> globalLocks = new HashMap<>(); private final LockAndQueue metaLock; public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) { @@ -94,6 +95,10 @@ class SchemaLocking { return metaLock; } + LockAndQueue getGlobalLock(String globalId) { + return getLock(globalLocks, globalId); + } + LockAndQueue removeRegionLock(String encodedRegionName) { return regionLocks.remove(encodedRegionName); } @@ -114,6 +119,10 @@ class SchemaLocking { return peerLocks.remove(peerId); } + LockAndQueue removeGlobalLock(String globalId) { + return globalLocks.remove(globalId); + } + private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName, LockAndQueue queue) { LockType lockType; @@ -164,6 +173,8 @@ class SchemaLocking { addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER); addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock), tn -> tn.getNameAsString(), LockedResourceType.META); + addToLockedResources(lockedResources, globalLocks, Function.identity(), + LockedResourceType.GLOBAL); return lockedResources; } @@ -191,6 +202,10 @@ class SchemaLocking { break; case META: queue = metaLock; + break; + case GLOBAL: + queue = globalLocks.get(resourceName); + break; default: queue = null; break; @@ -216,7 +231,8 @@ class SchemaLocking { + filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) + ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks=" + filterUnlocked(this.peerLocks) + ", metaLocks=" - + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)); + + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks=" + + filterUnlocked(globalLocks); } private String filterUnlocked(Map<?, LockAndQueue> locks) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index f0edf73715e..0cf34126a94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -940,6 +940,21 @@ public class TestMasterProcedureScheduler { } } + public static class TestGlobalProcedure extends TestProcedure + implements GlobalProcedureInterface { + private final String globalId; + + public TestGlobalProcedure(long procId, String globalId) { + super(procId); + this.globalId = globalId; + } + + @Override + public String getGlobalId() { + return globalId; + } + } + private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception { LockProcedure procedure = new LockProcedure(); @@ -1093,6 +1108,39 @@ public class TestMasterProcedureScheduler { assertEquals(1, resource.getWaitingProcedures().size()); } + @Test + public void testListLocksGlobal() throws Exception { + String globalId = "1"; + LockProcedure procedure = createExclusiveLockProcedure(4); + queue.waitGlobalExclusiveLock(procedure, globalId); + + List<LockedResource> locks = queue.getLocks(); + assertEquals(1, locks.size()); + + LockedResource resource = locks.get(0); + assertLockResource(resource, LockedResourceType.GLOBAL, globalId); + assertExclusiveLock(resource, procedure); + assertTrue(resource.getWaitingProcedures().isEmpty()); + + // Try to acquire the exclusive lock again with same procedure + assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId)); + + // Try to acquire the exclusive lock again with new procedure + LockProcedure procedure2 = createExclusiveLockProcedure(5); + assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId)); + + // Same peerId, still only has 1 LockedResource + locks = queue.getLocks(); + assertEquals(1, locks.size()); + + resource = locks.get(0); + assertLockResource(resource, LockedResourceType.GLOBAL, globalId); + // LockedResource owner still is the origin procedure + assertExclusiveLock(resource, procedure); + // The new procedure should in the waiting list + assertEquals(1, resource.getWaitingProcedures().size()); + } + @Test public void testListLocksWaiting() throws Exception { LockProcedure procedure1 = createExclusiveLockProcedure(1);