HBASE-16786 Procedure V2 - Move ZK-lock's uses to Procedure framework locks (LockProcedure) - Matteo Bertozzi Locks are no longer hosted up in zookeeper but instead by the Master.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/76dc957f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/76dc957f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/76dc957f Branch: refs/heads/master Commit: 76dc957f64fa38ce88694054db7dbf590f368ae7 Parents: bff7c4f Author: Michael Stack <st...@apache.org> Authored: Mon Jan 16 22:18:53 2017 -0800 Committer: Michael Stack <st...@apache.org> Committed: Thu Jan 19 09:34:17 2017 -0800 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 2 +- .../procedure2/TestProcedureSuspended.java | 10 +- .../hbase/rsgroup/RSGroupAdminServer.java | 14 +- .../hadoop/hbase/client/locking/EntityLock.java | 25 +- .../hadoop/hbase/ipc/RpcServerFactory.java | 13 +- .../hadoop/hbase/ipc/SimpleRpcServer.java | 4 +- .../hadoop/hbase/master/AssignmentManager.java | 8 +- .../master/ExpiredMobFileCleanerChore.java | 36 +- .../org/apache/hadoop/hbase/master/HMaster.java | 8 +- .../hbase/master/MasterMobCompactionThread.java | 19 +- .../hadoop/hbase/master/MasterRpcServices.java | 86 +++- .../hadoop/hbase/master/MasterServices.java | 5 - .../hadoop/hbase/master/MobCompactionChore.java | 9 +- .../hadoop/hbase/master/TableLockManager.java | 453 ------------------- .../hbase/master/locking/LockManager.java | 22 +- .../hbase/master/locking/LockProcedure.java | 1 + .../master/procedure/MasterProcedureEnv.java | 3 +- .../procedure/MasterProcedureScheduler.java | 183 ++------ .../master/procedure/MasterProcedureUtil.java | 2 +- .../master/snapshot/TakeSnapshotHandler.java | 40 +- .../org/apache/hadoop/hbase/mob/MobUtils.java | 30 +- .../hbase/regionserver/CompactSplitThread.java | 2 +- .../hadoop/hbase/regionserver/HMobStore.java | 70 ++- .../hbase/regionserver/HRegionServer.java | 45 +- .../hadoop/hbase/regionserver/HStore.java | 1 - .../regionserver/RegionServerServices.java | 14 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 33 +- .../hbase/util/hbck/TableLockChecker.java | 87 ---- .../hadoop/hbase/MockRegionServerServices.java | 14 +- .../hbase/coprocessor/TestMasterObserver.java | 3 +- .../hbase/master/MockNoopMasterServices.java | 5 - .../hadoop/hbase/master/MockRegionServer.java | 13 +- .../hbase/master/TestTableLockManager.java | 433 ------------------ .../hbase/master/locking/TestLockProcedure.java | 9 - ...ProcedureSchedulerPerformanceEvaluation.java | 4 +- .../procedure/TestMasterProcedureScheduler.java | 42 +- ...TestMasterProcedureSchedulerConcurrency.java | 4 +- .../TestMergeTableRegionsProcedure.java | 231 ---------- .../regionserver/TestMobStoreCompaction.java | 19 +- .../regionserver/TestRegionServerMetrics.java | 7 +- .../security/token/TestTokenAuthentication.java | 1 - .../hadoop/hbase/util/TestHBaseFsckOneRS.java | 80 ---- .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 1 - 43 files changed, 325 insertions(+), 1766 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index d3b65e8..0912cb7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -776,7 +776,7 @@ public class ProcedureExecutor<TEnvironment> { if (nonceKey != null) { currentProcId = nonceKeysToProcIdsMap.get(nonceKey); Preconditions.checkArgument(currentProcId != null, - "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()"); + "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); } else { currentProcId = nextProcId(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index 9a108a8..0a8b0e4 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -28,20 +30,16 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; -import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Threads; - import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - @Category({MasterTests.class, SmallTests.class}) public class TestProcedureSuspended { private static final Log LOG = LogFactory.getLog(TestProcedureSuspended.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index dc28f7d..bf0feab 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -53,7 +53,8 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.locking.LockManager; +import org.apache.hadoop.hbase.master.locking.LockProcedure; /** * Service to support Region Server Grouping (HBase-6721) @@ -273,10 +274,15 @@ public class RSGroupAdminServer extends RSGroupAdmin { master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup); } } - for(TableName table: tables) { - TableLock lock = master.getTableLockManager().writeLock(table, "Group: table move"); + for (TableName table: tables) { + LockManager.MasterLock lock = master.getLockManager().createMasterLock(table, + LockProcedure.LockType.EXCLUSIVE, this.getClass().getName() + ": Group: table move"); try { - lock.acquire(); + try { + lock.acquire(); + } catch (InterruptedException e) { + throw new IOException("Interrupted when waiting for table lock", e); + } for (HRegionInfo region : master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) { master.getAssignmentManager().unassign(region); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java index 990c76d..c141c3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/locking/EntityLock.java @@ -164,6 +164,8 @@ public class EntityLock { /** * Sends rpc to the master to request lock. * The lock request is queued with other lock requests. + * Call {@link #await()} to wait on lock. + * Always call {@link #unlock()} after calling the below, even after error. */ public void requestLock() throws IOException { if (procId == null) { @@ -200,9 +202,7 @@ public class EntityLock { } public void unlock() throws IOException { - locked.set(false); - worker.interrupt(); - Threads.shutdown(worker); + Threads.shutdown(worker.shutdown()); try { stub.lockHeartbeat(null, LockHeartbeatRequest.newBuilder().setProcId(procId).setKeepAlive(false).build()); @@ -212,8 +212,21 @@ public class EntityLock { } protected class LockHeartbeatWorker extends Thread { + private volatile boolean shutdown = false; + public LockHeartbeatWorker(final String desc) { super("LockHeartbeatWorker(" + desc + ")"); + setDaemon(true); + } + + /** + * Shutdown the thread cleanly, quietly. We done. + * @return + */ + Thread shutdown() { + shutdown = true; + interrupt(); + return this; } public void run() { @@ -256,8 +269,10 @@ public class EntityLock { } catch (InterruptedException e) { // Since there won't be any more heartbeats, assume lock will be lost. locked.set(false); - LOG.error("Interrupted, releasing " + EntityLock.this, e); - abort.abort("Worker thread interrupted", e); + if (!this.shutdown) { + LOG.error("Interrupted, releasing " + this, e); + abort.abort("Worker thread interrupted", e); + } return; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java index 7d91a2c..eb2b70e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerFactory.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.ServiceDescriptor; import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.Private @@ -48,11 +49,17 @@ public class RpcServerFactory { RpcScheduler scheduler) throws IOException { String rpcServerClass = conf.get(CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); - LOG.info("Use " + rpcServerClass + " rpc server"); + StringBuffer servicesList = new StringBuffer(); + for (BlockingServiceAndInterface s: services) { + ServiceDescriptor sd = s.getBlockingService().getDescriptorForType(); + if (sd == null) continue; // Can be null for certain tests like TestTokenAuthentication + if (servicesList.length() > 0) servicesList.append(", "); + servicesList.append(sd.getFullName()); + } + LOG.info("Creating " + rpcServerClass + " hosting " + servicesList); return ReflectionUtils.instantiateWithCustomCtor(rpcServerClass, new Class[] { Server.class, String.class, List.class, InetSocketAddress.class, Configuration.class, RpcScheduler.class }, new Object[] { server, name, services, bindAddress, conf, scheduler }); } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 01d45cd..075d8b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -1273,7 +1273,9 @@ public class SimpleRpcServer extends RpcServer { String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); - if (this.service == null) throw new UnknownServiceException(serviceName); + if (this.service == null) { + throw new UnknownServiceException(serviceName); + } setupCellBlockCodecs(this.connectionHeader); RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = RPCProtos.ConnectionHeaderResponse.newBuilder(); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 3ab4678..3005334 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -112,8 +112,6 @@ public class AssignmentManager { private final MetricsAssignmentManager metricsAssignmentManager; - private final TableLockManager tableLockManager; - private AtomicInteger numRegionsOpened = new AtomicInteger(0); final private KeyLocker<String> locker = new KeyLocker<String>(); @@ -212,13 +210,10 @@ public class AssignmentManager { * @param balancer implementation of {@link LoadBalancer} * @param service Executor service * @param metricsMaster metrics manager - * @param tableLockManager TableLock manager * @throws IOException */ public AssignmentManager(MasterServices server, ServerManager serverManager, - final LoadBalancer balancer, - final ExecutorService service, MetricsMaster metricsMaster, - final TableLockManager tableLockManager, + final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableStateManager tableStateManager) throws IOException { this.server = server; @@ -258,7 +253,6 @@ public class AssignmentManager { conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); this.metricsAssignmentManager = new MetricsAssignmentManager(); - this.tableLockManager = tableLockManager; // Configurations for retrying opening a region on receiving a FAILED_OPEN this.retryConfig = new RetryCounter.RetryConfig(); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java index 3261bd6..faa4f0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master; -import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -29,8 +28,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.LockTimeoutException; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.locking.LockManager; +import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; @@ -44,7 +43,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(ExpiredMobFileCleanerChore.class); private final HMaster master; - private TableLockManager tableLockManager; private ExpiredMobFileCleaner cleaner; public ExpiredMobFileCleanerChore(HMaster master) { @@ -53,7 +51,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore { .getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, MobConstants.DEFAULT_MOB_CLEANER_PERIOD), TimeUnit.SECONDS); this.master = master; - this.tableLockManager = master.getTableLockManager(); cleaner = new ExpiredMobFileCleaner(); cleaner.setConf(master.getConfiguration()); } @@ -70,33 +67,14 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore { if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { // clean only for mob-enabled column. // obtain a read table lock before cleaning, synchronize with MobFileCompactionChore. - boolean tableLocked = false; - TableLock lock = null; + final LockManager.MasterLock lock = master.getLockManager().createMasterLock( + MobUtils.getTableLockName(htd.getTableName()), LockProcedure.LockType.SHARED, + this.getClass().getSimpleName() + ": Cleaning expired mob files"); try { - // the tableLockManager might be null in testing. In that case, it is lock-free. - if (tableLockManager != null) { - lock = tableLockManager.readLock(MobUtils.getTableLockName(htd.getTableName()), - "Run ExpiredMobFileCleanerChore"); - lock.acquire(); - } - tableLocked = true; + lock.acquire(); cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd); - } catch (LockTimeoutException e) { - LOG.info("Fail to acquire the lock because of timeout, maybe a" - + " MobCompactor is running", e); - } catch (IOException e) { - LOG.error( - "Fail to clean the expired mob files for the column " + hcd.getNameAsString() - + " in the table " + htd.getNameAsString(), e); } finally { - if (lock != null && tableLocked) { - try { - lock.release(); - } catch (IOException e) { - LOG.error( - "Fail to release the read lock for the table " + htd.getNameAsString(), e); - } - } + lock.release(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ab7a25e..154958b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -645,8 +645,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.balancer, this.service, this.metricsMaster, - this.tableLockManager, tableStateManager); + this.balancer, this.service, this.metricsMaster, tableStateManager); this.replicationManager = new ReplicationManager(conf, zooKeeper, this); @@ -732,8 +731,6 @@ public class HMaster extends HRegionServer implements MasterServices { this.serverManager = createServerManager(this); - // Invalidate all write locks held previously - this.tableLockManager.reapWriteLocks(); this.tableStateManager = new TableStateManager(this); status.setStatus("Initializing ZK system trackers"); @@ -3030,8 +3027,7 @@ public class HMaster extends HRegionServer implements MasterServices { */ public void requestMobCompaction(TableName tableName, List<HColumnDescriptor> columns, boolean allFiles) throws IOException { - mobCompactThread.requestMobCompaction(conf, fs, tableName, columns, - tableLockManager, allFiles); + mobCompactThread.requestMobCompaction(conf, fs, tableName, columns, allFiles); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java index c0a915b..fc0ecfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.locking.LockManager; +import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -74,15 +76,13 @@ public class MasterMobCompactionThread { * @param fs The file system * @param tableName The table the compact * @param columns The column descriptors - * @param tableLockManager The tableLock manager * @param allFiles Whether add all mob files into the compaction. */ public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName, - List<HColumnDescriptor> columns, TableLockManager tableLockManager, boolean allFiles) - throws IOException { + List<HColumnDescriptor> columns, boolean allFiles) throws IOException { master.reportMobCompactionStart(tableName); try { - masterMobPool.execute(new CompactionRunner(fs, tableName, columns, tableLockManager, + masterMobPool.execute(new CompactionRunner(fs, tableName, columns, allFiles, mobCompactorPool)); } catch (RejectedExecutionException e) { // in case the request is rejected by the pool @@ -103,27 +103,28 @@ public class MasterMobCompactionThread { private FileSystem fs; private TableName tableName; private List<HColumnDescriptor> hcds; - private TableLockManager tableLockManager; private boolean allFiles; private ExecutorService pool; public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds, - TableLockManager tableLockManager, boolean allFiles, ExecutorService pool) { + boolean allFiles, ExecutorService pool) { super(); this.fs = fs; this.tableName = tableName; this.hcds = hcds; - this.tableLockManager = tableLockManager; this.allFiles = allFiles; this.pool = pool; } @Override public void run() { + // These locks are on dummy table names, and only used for compaction/mob file cleaning. + final LockManager.MasterLock lock = master.getLockManager().createMasterLock( + MobUtils.getTableLockName(tableName), LockProcedure.LockType.EXCLUSIVE, + this.getClass().getName() + ": mob compaction"); try { for (HColumnDescriptor hcd : hcds) { - MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, tableLockManager, - allFiles); + MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, allFiles, lock); } } catch (IOException e) { LOG.error("Failed to perform the mob compaction", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 1151c92..60b8b65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -52,16 +52,28 @@ import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.locking.LockProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessController; +import org.apache.hadoop.hbase.security.visibility.VisibilityController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; @@ -71,9 +83,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDe import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.*; -import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.*; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse.Capability; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; @@ -103,16 +119,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; -import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.access.AccessController; -import org.apache.hadoop.hbase.security.visibility.VisibilityController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -228,13 +234,15 @@ public class MasterRpcServices extends RSRpcServices * @return list of blocking services and their security info classes that this server supports */ protected List<BlockingServiceAndInterface> getServices() { - List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(4); + List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(5); bssi.add(new BlockingServiceAndInterface( MasterService.newReflectiveBlockingService(this), MasterService.BlockingInterface.class)); bssi.add(new BlockingServiceAndInterface( RegionServerStatusService.newReflectiveBlockingService(this), RegionServerStatusService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface(LockService.newReflectiveBlockingService(this), + LockService.BlockingInterface.class)); bssi.addAll(super.getServices()); return bssi; } @@ -1754,34 +1762,62 @@ public class MasterRpcServices extends RSRpcServices } @Override - public LockResponse requestLock(RpcController controller, LockRequest request) + public LockResponse requestLock(RpcController controller, final LockRequest request) throws ServiceException { try { if (request.getDescription().isEmpty()) { throw new IllegalArgumentException("Empty description"); } - final long procId; + NonceProcedureRunnable npr; LockProcedure.LockType type = LockProcedure.LockType.valueOf(request.getLockType().name()); if (request.getRegionInfoCount() > 0) { final HRegionInfo[] regionInfos = new HRegionInfo[request.getRegionInfoCount()]; for (int i = 0; i < request.getRegionInfoCount(); ++i) { regionInfos[i] = HRegionInfo.convert(request.getRegionInfo(i)); } - procId = master.getLockManager().remoteLocks().requestRegionsLock(regionInfos, - request.getDescription(), request.getNonceGroup(), request.getNonce()); - return LockResponse.newBuilder().setProcId(procId).build(); + npr = new NonceProcedureRunnable(master, request.getNonceGroup(), request.getNonce()) { + @Override + protected void run() throws IOException { + setProcId(master.getLockManager().remoteLocks().requestRegionsLock(regionInfos, + request.getDescription(), getNonceKey())); + } + + @Override + protected String getDescription() { + return "RequestLock"; + } + }; } else if (request.hasTableName()) { final TableName tableName = ProtobufUtil.toTableName(request.getTableName()); - procId = master.getLockManager().remoteLocks().requestTableLock(tableName, type, - request.getDescription(), request.getNonceGroup(), request.getNonce()); - return LockResponse.newBuilder().setProcId(procId).build(); + npr = new NonceProcedureRunnable(master, request.getNonceGroup(), request.getNonce()) { + @Override + protected void run() throws IOException { + setProcId(master.getLockManager().remoteLocks().requestTableLock(tableName, type, + request.getDescription(), getNonceKey())); + } + + @Override + protected String getDescription() { + return "RequestLock"; + } + }; } else if (request.hasNamespace()) { - procId = master.getLockManager().remoteLocks().requestNamespaceLock( - request.getNamespace(), type, request.getDescription(), - request.getNonceGroup(), request.getNonce()); + npr = new NonceProcedureRunnable(master, request.getNonceGroup(), request.getNonce()) { + @Override + protected void run() throws IOException { + setProcId(master.getLockManager().remoteLocks().requestNamespaceLock( + request.getNamespace(), type, request.getDescription(), getNonceKey())); + } + + @Override + protected String getDescription() { + return "RequestLock"; + } + }; } else { throw new IllegalArgumentException("one of table/namespace/region should be specified"); } + long procId = MasterProcedureUtil.submitProcedure(npr); return LockResponse.newBuilder().setProcId(procId).build(); } catch (IllegalArgumentException e) { LOG.warn("Exception when queuing lock", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 79ebca5..66758f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -93,11 +93,6 @@ public interface MasterServices extends Server { ExecutorService getExecutorService(); /** - * @return Master's instance of {@link TableLockManager} - */ - TableLockManager getTableLockManager(); - - /** * @return Master's instance of {@link TableStateManager} */ TableStateManager getTableStateManager(); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java index 4b956e6..42a5445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobCompactionChore.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.locking.LockManager; +import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.mob.MobUtils; /** @@ -40,14 +42,12 @@ public class MobCompactionChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(MobCompactionChore.class); private HMaster master; - private TableLockManager tableLockManager; private ExecutorService pool; public MobCompactionChore(HMaster master, int period) { // use the period as initial delay. super(master.getServerName() + "-MobCompactionChore", master, period, period, TimeUnit.SECONDS); this.master = master; - this.tableLockManager = master.getTableLockManager(); this.pool = MobUtils.createMobCompactorThreadPool(master.getConfiguration()); } @@ -63,6 +63,9 @@ public class MobCompactionChore extends ScheduledChore { } boolean reported = false; try { + final LockManager.MasterLock lock = master.getLockManager().createMasterLock( + MobUtils.getTableLockName(htd.getTableName()), LockProcedure.LockType.EXCLUSIVE, + this.getClass().getName() + ": mob compaction"); for (HColumnDescriptor hcd : htd.getColumnFamilies()) { if (!hcd.isMobEnabled()) { continue; @@ -72,7 +75,7 @@ public class MobCompactionChore extends ScheduledChore { reported = true; } MobUtils.doMobCompaction(master.getConfiguration(), master.getFileSystem(), - htd.getTableName(), hcd, pool, tableLockManager, false); + htd.getTableName(), hcd, pool, false, lock); } } finally { if (reported) { http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java deleted file mode 100644 index c8eefa3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.InterProcessLock; -import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler; -import org.apache.hadoop.hbase.InterProcessReadWriteLock; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.LockTimeoutException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock; -import org.apache.zookeeper.KeeperException; - -/** - * A manager for distributed table level locks. - */ -@InterfaceAudience.Private -public abstract class TableLockManager { - - private static final Log LOG = LogFactory.getLog(TableLockManager.class); - - /** Configuration key for enabling table-level locks for schema changes */ - public static final String TABLE_LOCK_ENABLE = - "hbase.table.lock.enable"; - - /** by default we should enable table-level locks for schema changes */ - private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true; - - /** Configuration key for time out for trying to acquire table locks */ - protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS = - "hbase.table.write.lock.timeout.ms"; - - /** Configuration key for time out for trying to acquire table locks */ - protected static final String TABLE_READ_LOCK_TIMEOUT_MS = - "hbase.table.read.lock.timeout.ms"; - - protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = - 600 * 1000; //10 min default - - protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = - 600 * 1000; //10 min default - - public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms"; - - public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS = - 600 * 1000; //10 min default - - /** - * A distributed lock for a table. - */ - @InterfaceAudience.Private - public interface TableLock { - /** - * Acquire the lock, with the configured lock timeout. - * @throws LockTimeoutException If unable to acquire a lock within a specified - * time period (if any) - * @throws IOException If unrecoverable error occurs - */ - void acquire() throws IOException; - - /** - * Release the lock already held. - * @throws IOException If there is an unrecoverable error releasing the lock - */ - void release() throws IOException; - } - - /** - * Returns a TableLock for locking the table for exclusive access - * @param tableName Table to lock - * @param purpose Human readable reason for locking the table - * @return A new TableLock object for acquiring a write lock - */ - public abstract TableLock writeLock(TableName tableName, String purpose); - - /** - * Returns a TableLock for locking the table for shared access among read-lock holders - * @param tableName Table to lock - * @param purpose Human readable reason for locking the table - * @return A new TableLock object for acquiring a read lock - */ - public abstract TableLock readLock(TableName tableName, String purpose); - - /** - * Visits all table locks(read and write), and lock attempts with the given callback - * MetadataHandler. - * @param handler the metadata handler to call - * @throws IOException If there is an unrecoverable error - */ - public abstract void visitAllLocks(MetadataHandler handler) throws IOException; - - /** - * Force releases all table locks(read and write) that have been held longer than - * "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper - * and this servers is negligible. - * The behavior of the lock holders still thinking that they have the lock is undefined. - * @throws IOException If there is an unrecoverable error - */ - public abstract void reapAllExpiredLocks() throws IOException; - - /** - * Force releases table write locks and lock attempts even if this thread does - * not own the lock. The behavior of the lock holders still thinking that they - * have the lock is undefined. This should be used carefully and only when - * we can ensure that all write-lock holders have died. For example if only - * the master can hold write locks, then we can reap it's locks when the backup - * master starts. - * @throws IOException If there is an unrecoverable error - */ - public abstract void reapWriteLocks() throws IOException; - - /** - * Called after a table has been deleted, and after the table lock is released. - * TableLockManager should do cleanup for the table state. - * @param tableName name of the table - * @throws IOException If there is an unrecoverable error releasing the lock - */ - public abstract void tableDeleted(TableName tableName) - throws IOException; - - /** - * Creates and returns a TableLockManager according to the configuration - */ - public static TableLockManager createTableLockManager(Configuration conf, - ZooKeeperWatcher zkWatcher, ServerName serverName) { - // Initialize table level lock manager for schema changes, if enabled. - if (conf.getBoolean(TABLE_LOCK_ENABLE, - DEFAULT_TABLE_LOCK_ENABLE)) { - long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS, - DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS); - long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS, - DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS); - long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT, - DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS); - - return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs); - } - - return new NullTableLockManager(); - } - - /** - * A null implementation - */ - @InterfaceAudience.Private - public static class NullTableLockManager extends TableLockManager { - static class NullTableLock implements TableLock { - @Override - public void acquire() throws IOException { - } - @Override - public void release() throws IOException { - } - } - @Override - public TableLock writeLock(TableName tableName, String purpose) { - return new NullTableLock(); - } - @Override - public TableLock readLock(TableName tableName, String purpose) { - return new NullTableLock(); - } - @Override - public void reapAllExpiredLocks() throws IOException { - } - @Override - public void reapWriteLocks() throws IOException { - } - @Override - public void tableDeleted(TableName tableName) throws IOException { - } - @Override - public void visitAllLocks(MetadataHandler handler) throws IOException { - } - } - - /** Public for hbck */ - public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - if (bytes == null || bytes.length < pblen) { - return null; - } - try { - ZooKeeperProtos.TableLock.Builder builder = ZooKeeperProtos.TableLock.newBuilder(); - ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); - return builder.build(); - } catch (IOException ex) { - LOG.warn("Exception in deserialization", ex); - } - return null; - } - - /** - * ZooKeeper based TableLockManager - */ - @InterfaceAudience.Private - private static class ZKTableLockManager extends TableLockManager { - - private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() { - @Override - public void handleMetadata(byte[] ownerMetadata) { - if (!LOG.isDebugEnabled()) { - return; - } - ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata); - if (data == null) { - return; - } - LOG.debug("Table is locked by " + - String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " + - "purpose=%s, isShared=%s, createTime=%s]", - data.getTableName().getNamespace().toStringUtf8(), - data.getTableName().getQualifier().toStringUtf8(), - ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(), - data.getPurpose(), data.getIsShared(), data.getCreateTime())); - } - }; - - private static class TableLockImpl implements TableLock { - long lockTimeoutMs; - TableName tableName; - InterProcessLock lock; - boolean isShared; - ZooKeeperWatcher zkWatcher; - ServerName serverName; - String purpose; - - public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher, - ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) { - this.tableName = tableName; - this.zkWatcher = zkWatcher; - this.serverName = serverName; - this.lockTimeoutMs = lockTimeoutMs; - this.isShared = isShared; - this.purpose = purpose; - } - - @Override - public void acquire() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") + - " lock on: " + tableName + " for:" + purpose); - } - - lock = createTableLock(); - try { - if (lockTimeoutMs == -1) { - // Wait indefinitely - lock.acquire(); - } else { - if (!lock.tryAcquire(lockTimeoutMs)) { - throw new LockTimeoutException("Timed out acquiring " + - (isShared ? "read" : "write") + "lock for table:" + tableName + - "for:" + purpose + " after " + lockTimeoutMs + " ms."); - } - } - } catch (InterruptedException e) { - LOG.warn("Interrupted acquiring a lock for " + tableName, e); - Thread.currentThread().interrupt(); - throw new InterruptedIOException("Interrupted acquiring a lock"); - } - if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write") - + " lock on " + tableName + " for " + purpose); - } - - @Override - public void release() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Attempt to release table " + (isShared ? "read" : "write") - + " lock on " + tableName); - } - if (lock == null) { - throw new IllegalStateException("Table " + tableName + - " is not locked!"); - } - - try { - lock.release(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while releasing a lock for " + tableName); - throw new InterruptedIOException(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Released table lock on " + tableName); - } - } - - private InterProcessLock createTableLock() { - String tableLockZNode = ZKUtil.joinZNode(zkWatcher.znodePaths.tableLockZNode, - tableName.getNameAsString()); - - ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)) - .setLockOwner(ProtobufUtil.toServerName(serverName)) - .setThreadId(Thread.currentThread().getId()) - .setPurpose(purpose) - .setIsShared(isShared) - .setCreateTime(EnvironmentEdgeManager.currentTime()).build(); - byte[] lockMetadata = toBytes(data); - - InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode, - METADATA_HANDLER); - return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata); - } - } - - private static byte[] toBytes(ZooKeeperProtos.TableLock data) { - return ProtobufUtil.prependPBMagic(data.toByteArray()); - } - - private final ServerName serverName; - private final ZooKeeperWatcher zkWatcher; - private final long writeLockTimeoutMs; - private final long readLockTimeoutMs; - private final long lockExpireTimeoutMs; - - /** - * Initialize a new manager for table-level locks. - * @param zkWatcher - * @param serverName Address of the server responsible for acquiring and - * releasing the table-level locks - * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a - * given table, or -1 for no timeout - * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a - * given table, or -1 for no timeout - */ - public ZKTableLockManager(ZooKeeperWatcher zkWatcher, - ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) { - this.zkWatcher = zkWatcher; - this.serverName = serverName; - this.writeLockTimeoutMs = writeLockTimeoutMs; - this.readLockTimeoutMs = readLockTimeoutMs; - this.lockExpireTimeoutMs = lockExpireTimeoutMs; - } - - @Override - public TableLock writeLock(TableName tableName, String purpose) { - return new TableLockImpl(tableName, zkWatcher, - serverName, writeLockTimeoutMs, false, purpose); - } - - public TableLock readLock(TableName tableName, String purpose) { - return new TableLockImpl(tableName, zkWatcher, - serverName, readLockTimeoutMs, true, purpose); - } - - public void visitAllLocks(MetadataHandler handler) throws IOException { - for (String tableName : getTableNames()) { - String tableLockZNode = ZKUtil.joinZNode(zkWatcher.znodePaths.tableLockZNode, tableName); - ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( - zkWatcher, tableLockZNode, null); - lock.readLock(null).visitLocks(handler); - lock.writeLock(null).visitLocks(handler); - } - } - - private List<String> getTableNames() throws IOException { - - List<String> tableNames; - try { - tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.znodePaths.tableLockZNode); - } catch (KeeperException e) { - LOG.error("Unexpected ZooKeeper error when listing children", e); - throw new IOException("Unexpected ZooKeeper exception", e); - } - return tableNames; - } - - @Override - public void reapWriteLocks() throws IOException { - //get the table names - try { - for (String tableName : getTableNames()) { - String tableLockZNode = ZKUtil.joinZNode(zkWatcher.znodePaths.tableLockZNode, tableName); - ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( - zkWatcher, tableLockZNode, null); - lock.writeLock(null).reapAllLocks(); - } - } catch (IOException ex) { - throw ex; - } catch (Exception ex) { - LOG.warn("Caught exception while reaping table write locks", ex); - } - } - - @Override - public void reapAllExpiredLocks() throws IOException { - //get the table names - try { - for (String tableName : getTableNames()) { - String tableLockZNode = ZKUtil.joinZNode(zkWatcher.znodePaths.tableLockZNode, tableName); - ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( - zkWatcher, tableLockZNode, null); - lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs); - lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs); - } - } catch (IOException ex) { - throw ex; - } catch (Exception ex) { - throw new IOException(ex); - } - } - - @Override - public void tableDeleted(TableName tableName) throws IOException { - //table write lock from DeleteHandler is already released, just delete the parent znode - String tableNameStr = tableName.getNameAsString(); - String tableLockZNode = ZKUtil.joinZNode(zkWatcher.znodePaths.tableLockZNode, tableNameStr); - try { - ZKUtil.deleteNode(zkWatcher, tableLockZNode); - } catch (KeeperException ex) { - if (ex.code() == KeeperException.Code.NOTEMPTY) { - //we might get this in rare occasions where a CREATE table or some other table operation - //is waiting to acquire the lock. In this case, parent znode won't be deleted. - LOG.warn("Could not delete the znode for table locks because NOTEMPTY: " - + tableLockZNode); - return; - } - throw new IOException(ex); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockManager.java index 8f99f5e..b72e219 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockManager.java @@ -63,9 +63,8 @@ public final class LockManager { return new MasterLock(regionInfos, description); } - private void submitProcedure(final LockProcedure proc, final long nonceGroup, final long nonce) { + private void submitProcedure(final LockProcedure proc, final NonceKey nonceKey) { proc.setOwner(master.getMasterProcedureExecutor().getEnvironment().getRequestUser()); - final NonceKey nonceKey = master.getMasterProcedureExecutor().createNonceKey(nonceGroup, nonce); master.getMasterProcedureExecutor().submitProcedure(proc, nonceKey); } @@ -205,27 +204,23 @@ public final class LockManager { */ public class RemoteLocks { public long requestNamespaceLock(final String namespace, final LockProcedure.LockType type, - final String description, final long nonceGroup, final long nonce) + final String description, final NonceKey nonceKey) throws IllegalArgumentException, IOException { master.getMasterCoprocessorHost().preRequestLock(namespace, null, null, type, description); - final LockProcedure proc = new LockProcedure(master.getConfiguration(), namespace, type, description, null); - submitProcedure(proc, nonceGroup, nonce); - + submitProcedure(proc, nonceKey); master.getMasterCoprocessorHost().postRequestLock(namespace, null, null, type, description); return proc.getProcId(); } public long requestTableLock(final TableName tableName, final LockProcedure.LockType type, - final String description, final long nonceGroup, final long nonce) + final String description, final NonceKey nonceKey) throws IllegalArgumentException, IOException { master.getMasterCoprocessorHost().preRequestLock(null, tableName, null, type, description); - final LockProcedure proc = new LockProcedure(master.getConfiguration(), tableName, type, description, null); - submitProcedure(proc, nonceGroup, nonce); - + submitProcedure(proc, nonceKey); master.getMasterCoprocessorHost().postRequestLock(null, tableName, null, type, description); return proc.getProcId(); } @@ -234,14 +229,13 @@ public final class LockManager { * @throws IllegalArgumentException if all regions are not from same table. */ public long requestRegionsLock(final HRegionInfo[] regionInfos, final String description, - final long nonceGroup, final long nonce) throws IllegalArgumentException, IOException { + final NonceKey nonceKey) + throws IllegalArgumentException, IOException { master.getMasterCoprocessorHost().preRequestLock(null, null, regionInfos, LockProcedure.LockType.EXCLUSIVE, description); - final LockProcedure proc = new LockProcedure(master.getConfiguration(), regionInfos, LockProcedure.LockType.EXCLUSIVE, description, null); - submitProcedure(proc, nonceGroup, nonce); - + submitProcedure(proc, nonceKey); master.getMasterCoprocessorHost().postRequestLock(null, null, regionInfos, LockProcedure.LockType.EXCLUSIVE, description); return proc.getProcId(); http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index f793a65..1a1c8c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -1,4 +1,5 @@ /** + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 9362f24..353342a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -98,8 +98,7 @@ public class MasterProcedureEnv implements ConfigurationObserver { public MasterProcedureEnv(final MasterServices master) { this.master = master; - this.procSched = new MasterProcedureScheduler(master.getConfiguration(), - master.getTableLockManager()); + this.procSched = new MasterProcedureScheduler(master.getConfiguration()); } public User getRequestUser() { http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 3f588ff..b9b7b59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.procedure; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; @@ -35,8 +34,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -67,8 +64,6 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); - private final TableLockManager lockManager; - private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR = new NamespaceQueueKeyComparator(); private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR = @@ -87,9 +82,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private final int userTablePriority; private final int sysTablePriority; - public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { - this.lockManager = lockManager; - + public MasterProcedureScheduler(final Configuration conf) { // TODO: should this be part of the HTD? metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); @@ -456,7 +449,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private final NamespaceQueue namespaceQueue; private HashMap<String, RegionEvent> regionEventMap; - private TableLock tableLock = null; public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) { super(tableName, priority); @@ -544,65 +536,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType()); } - - private synchronized boolean tryZkSharedLock(final TableLockManager lockManager, - final String purpose) { - // Since we only have one lock resource. We should only acquire zk lock if the znode - // does not exist. - // - if (isSingleSharedLock()) { - // Take zk-read-lock - TableName tableName = getKey(); - tableLock = lockManager.readLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire read lock on " + tableName, e); - tableLock = null; - return false; - } - } - return true; - } - - private synchronized void releaseZkSharedLock(final TableLockManager lockManager) { - if (isSingleSharedLock()) { - releaseTableLock(lockManager, true); - } - } - - private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager, - final String purpose) { - // Take zk-write-lock - TableName tableName = getKey(); - tableLock = lockManager.writeLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire write lock on " + tableName, e); - tableLock = null; - return false; - } - return true; - } - - private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) { - releaseTableLock(lockManager, true); - } - - private void releaseTableLock(final TableLockManager lockManager, boolean reset) { - for (int i = 0; i < 3; ++i) { - try { - tableLock.release(); - if (reset) { - tableLock = null; - } - break; - } catch (IOException e) { - LOG.warn("Could not release the table write-lock", e); - } - } - } } private static class NamespaceQueueKeyComparator implements AvlKeyComparator<NamespaceQueue> { @@ -665,35 +598,22 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { */ public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); - TableQueue queue = getTableQueue(table); - if (!queue.getNamespaceQueue().trySharedLock()) { - schedUnlock(); - return false; - } - - if (!queue.tryExclusiveLock(procedure)) { - queue.getNamespaceQueue().releaseSharedLock(); - schedUnlock(); - return false; - } - - removeFromRunQueue(tableRunQueue, queue); - boolean hasParentLock = queue.hasParentLock(procedure); - schedUnlock(); + try { + final TableQueue queue = getTableQueue(table); + if (!queue.getNamespaceQueue().trySharedLock()) { + return false; + } - boolean hasXLock = true; - if (!hasParentLock) { - // Zk lock is expensive... - hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); - if (!hasXLock) { - schedLock(); - if (!hasParentLock) queue.releaseExclusiveLock(procedure); + if (!queue.tryExclusiveLock(procedure)) { queue.getNamespaceQueue().releaseSharedLock(); - addToRunQueue(tableRunQueue, queue); - schedUnlock(); + return false; } + + removeFromRunQueue(tableRunQueue, queue); + return true; + } finally { + schedUnlock(); } - return hasXLock; } /** @@ -702,19 +622,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param table the name of the table that has the exclusive lock */ public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) { - final TableQueue queue = getTableQueueWithLock(table); - final boolean hasParentLock = queue.hasParentLock(procedure); - - if (!hasParentLock) { - // Zk lock is expensive... - queue.releaseZkExclusiveLock(lockManager); - } - schedLock(); - if (!hasParentLock) queue.releaseExclusiveLock(procedure); - queue.getNamespaceQueue().releaseSharedLock(); - addToRunQueue(tableRunQueue, queue); - schedUnlock(); + try { + final TableQueue queue = getTableQueue(table); + if (!queue.hasParentLock(procedure)) { + queue.releaseExclusiveLock(procedure); + } + queue.getNamespaceQueue().releaseSharedLock(); + addToRunQueue(tableRunQueue, queue); + } finally { + schedUnlock(); + } } /** @@ -731,29 +649,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure, final TableName table) { schedLock(); - TableQueue queue = getTableQueue(table); - if (!queue.getNamespaceQueue().trySharedLock()) { - return null; - } + try { + final TableQueue queue = getTableQueue(table); + if (!queue.getNamespaceQueue().trySharedLock()) { + return null; + } - if (!queue.trySharedLock()) { - queue.getNamespaceQueue().releaseSharedLock(); - schedUnlock(); - return null; - } + if (!queue.trySharedLock()) { + queue.getNamespaceQueue().releaseSharedLock(); + return null; + } - // TODO: Zk lock is expensive and it would be perf bottleneck. Long term solution is - // to remove it. - if (!queue.tryZkSharedLock(lockManager, procedure.toString())) { - queue.releaseSharedLock(); - queue.getNamespaceQueue().releaseSharedLock(); + return queue; + } finally { schedUnlock(); - return null; } - - schedUnlock(); - - return queue; } /** @@ -762,17 +672,16 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param table the name of the table that has the shared lock */ public void releaseTableSharedLock(final Procedure procedure, final TableName table) { - final TableQueue queue = getTableQueueWithLock(table); - schedLock(); - // Zk lock is expensive... - queue.releaseZkSharedLock(lockManager); - - queue.getNamespaceQueue().releaseSharedLock(); - if (queue.releaseSharedLock()) { - addToRunQueue(tableRunQueue, queue); + try { + final TableQueue queue = getTableQueue(table); + if (queue.releaseSharedLock()) { + addToRunQueue(tableRunQueue, queue); + } + queue.getNamespaceQueue().releaseSharedLock(); + } finally { + schedUnlock(); } - schedUnlock(); } /** @@ -796,14 +705,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { if (AvlIterableList.isLinked(queue)) { tableRunQueue.remove(queue); } - - // Remove the table lock - try { - lockManager.tableDeleted(table); - } catch (IOException e) { - LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical - } - removeTableQueue(table); } else { // TODO: If there are no create, we can drop all the other ops http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java index 9706107..62cb0a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -141,4 +141,4 @@ public final class MasterProcedureUtil { } return runnable.getProcId(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index a0b6d25..992f28e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MetricsSnapshot; import org.apache.hadoop.hbase.master.SnapshotSentinel; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.locking.LockManager; +import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -83,8 +83,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh protected final Path workingDir; private final MasterSnapshotVerifier verifier; protected final ForeignExceptionDispatcher monitor; - protected final TableLockManager tableLockManager; - protected final TableLock tableLock; + protected final LockManager.MasterLock tableLock; protected final MonitoredTask status; protected final TableName snapshotTable; protected final SnapshotManifest snapshotManifest; @@ -114,10 +113,9 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh this.monitor = new ForeignExceptionDispatcher(snapshot.getName()); this.snapshotManifest = SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor); - this.tableLockManager = master.getTableLockManager(); - this.tableLock = this.tableLockManager.writeLock( - snapshotTable, - EventType.C_M_SNAPSHOT_TABLE.toString()); + this.tableLock = master.getLockManager().createMasterLock( + snapshotTable, LockProcedure.LockType.EXCLUSIVE, + this.getClass().getName() + ": take snapshot " + snapshot.getName()); // prepare the verify this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir); @@ -138,18 +136,14 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh public TakeSnapshotHandler prepare() throws Exception { super.prepare(); - this.tableLock.acquire(); // after this, you should ensure to release this lock in - // case of exceptions - boolean success = false; + // after this, you should ensure to release this lock in case of exceptions + this.tableLock.acquire(); try { this.htd = loadTableDescriptor(); // check that .tableinfo is present - success = true; - } finally { - if (!success) { - releaseTableLock(); - } + } catch (Exception e) { + this.tableLock.release(); + throw e; } - return this; } @@ -234,17 +228,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh LOG.error("Couldn't delete snapshot working directory:" + workingDir); } lock.unlock(); - releaseTableLock(); - } - } - - protected void releaseTableLock() { - if (this.tableLock != null) { - try { - this.tableLock.release(); - } catch (IOException ex) { - LOG.warn("Could not release the table lock", ex); - } + tableLock.release(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 770c069..2592812 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -61,8 +61,8 @@ import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.mob.compactions.MobCompactor; import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -699,12 +699,11 @@ public final class MobUtils { * @param tableName the table the compact * @param hcd the column descriptor * @param pool the thread pool - * @param tableLockManager the tableLock manager * @param allFiles Whether add all mob files into the compaction. */ public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName, - HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager, - boolean allFiles) throws IOException { + HColumnDescriptor hcd, ExecutorService pool, boolean allFiles, LockManager.MasterLock lock) + throws IOException { String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, PartitionedMobCompactor.class.getName()); // instantiate the mob compactor. @@ -719,29 +718,14 @@ public final class MobUtils { // compact only for mob-enabled column. // obtain a write table lock before performing compaction to avoid race condition // with major compaction in mob-enabled column. - boolean tableLocked = false; - TableLock lock = null; try { - // the tableLockManager might be null in testing. In that case, it is lock-free. - if (tableLockManager != null) { - lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName), - "Run MobCompactor"); - lock.acquire(); - } - tableLocked = true; + lock.acquire(); compactor.compact(allFiles); } catch (Exception e) { LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString() - + " in the table " + tableName.getNameAsString(), e); + + " in the table " + tableName.getNameAsString(), e); } finally { - if (lock != null && tableLocked) { - try { - lock.release(); - } catch (IOException e) { - LOG.error( - "Failed to release the write lock for the table " + tableName.getNameAsString(), e); - } - } + lock.release(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 63929a8..6870445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -80,7 +80,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit"; public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; - + private final HRegionServer server; private final Configuration conf; http://git-wip-us.apache.org/repos/asf/hbase/blob/76dc957f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 0bf6c9a..6ffa459 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; @@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.compress.Compression; @@ -52,8 +56,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFile; @@ -100,8 +102,6 @@ public class HMobStore extends HStore { private volatile long mobScanCellsCount = 0; private volatile long mobScanCellsSize = 0; private HColumnDescriptor family; - private TableLockManager tableLockManager; - private TableName tableLockName; private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>(); private final IdLock keyLock = new IdLock(); // When we add a MOB reference cell to the HFile, we will add 2 tags along with it @@ -126,10 +126,6 @@ public class HMobStore extends HStore { locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn) .getEncodedName(), family.getNameAsString())); map.put(Bytes.toString(tn.getName()), locations); - if (region.getRegionServerServices() != null) { - tableLockManager = region.getRegionServerServices().getTableLockManager(); - tableLockName = MobUtils.getTableLockName(getTableName()); - } List<Tag> tags = new ArrayList<>(2); tags.add(MobConstants.MOB_REF_TAG); Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, @@ -482,39 +478,39 @@ public class HMobStore extends HStore { // Acquire a table lock to coordinate. // 1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction. // 2. If the lock is obtained, run the compaction directly. - TableLock lock = null; - if (tableLockManager != null) { - lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore"); - } - boolean tableLocked = false; - String tableName = getTableName().getNameAsString(); - if (lock != null) { - try { - LOG.info("Start to acquire a read lock for the table[" + tableName - + "], ready to perform the major compaction"); - lock.acquire(); - tableLocked = true; - } catch (Exception e) { - LOG.error("Fail to lock the table " + tableName, e); - } - } else { - // If the tableLockManager is null, mark the tableLocked as true. - tableLocked = true; - } + EntityLock lock = null; try { - if (!tableLocked) { - LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table[" - + tableName + "], forcing the delete markers to be retained"); - compaction.getRequest().forceRetainDeleteMarkers(); + if (region.getRegionServerServices() != null) { + List<HRegionInfo> regionInfos = Collections.singletonList(region.getRegionInfo()); + // regionLock takes shared lock on table too. + lock = region.getRegionServerServices().regionLock(regionInfos, "MOB compaction", null); + int awaitTime = conf.getInt(HRegionServer.REGION_LOCK_AWAIT_TIME_SEC, + HRegionServer.DEFAULT_REGION_LOCK_AWAIT_TIME_SEC); + try { + LOG.info("Acquiring MOB major compaction lock " + lock); + lock.requestLock(); + lock.await(awaitTime, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Interrupted exception when waiting for lock: " + lock, e); + } + if (!lock.isLocked()) { + // Remove lock from queue on the master so that if it's granted in future, we don't + // keep holding it until compaction finishes + lock.unlock(); + lock = null; + LOG.warn("Cannot obtain table lock, maybe a sweep tool is running on this " + "table[" + + getTableName() + "], forcing the delete markers to be retained"); + } + } else { + LOG.warn("Cannot obtain lock because RegionServices not available. Are we running as " + + "compaction tool?"); } + // If no lock, retain delete markers to be safe. + if (lock == null) compaction.getRequest().forceRetainDeleteMarkers(); return super.compact(compaction, throughputController, user); } finally { - if (tableLocked && lock != null) { - try { - lock.release(); - } catch (IOException e) { - LOG.error("Fail to release the table lock " + tableName, e); - } + if (lock != null && lock.isLocked()) { + lock.unlock(); } } } else {