Repository: hbase Updated Branches: refs/heads/master f78284685 -> c448604ce
HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c448604c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c448604c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c448604c Branch: refs/heads/master Commit: c448604ceb987d113913f0583452b2abce04db0d Parents: f782846 Author: Jingyun Tian <tianjy1...@gmail.com> Authored: Mon Dec 17 19:32:23 2018 +0800 Committer: Jingyun Tian <tia...@apache.org> Committed: Tue Dec 18 16:57:11 2018 +0800 ---------------------------------------------------------------------- .../hbase/master/RegionServerTracker.java | 3 + .../hadoop/hbase/master/ServerManager.java | 25 ++++---- .../master/assignment/AssignmentManager.java | 28 ++++++--- .../hbase/master/assignment/RegionStates.java | 3 +- .../hbase/master/assignment/ServerState.java | 2 +- .../master/assignment/ServerStateNode.java | 2 +- .../master/procedure/ServerCrashProcedure.java | 16 ++--- .../hadoop/hbase/HBaseTestingUtility.java | 7 ++- .../hadoop/hbase/master/TestRestartCluster.java | 65 ++++++++++++++++++++ .../procedure/TestServerCrashProcedure.java | 38 ++++++++++++ 10 files changed, 155 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index f419732..9d33a21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -128,6 +128,9 @@ public class RegionServerTracker extends ZKListener { // '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not. splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)). forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s)); + //create ServerNode for all possible live servers from wal directory + liveServersFromWALDir.stream() + .forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn)); watcher.registerListener(this); synchronized (this) { List<String> servers = http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index dc76d72..86d72d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -602,19 +602,22 @@ public class ServerManager { return false; } LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); - master.getAssignmentManager().submitServerCrash(serverName, true); - - // Tell our listeners that a server was removed - if (!this.listeners.isEmpty()) { - for (ServerListener listener : this.listeners) { - listener.serverRemoved(serverName); + long pid = master.getAssignmentManager().submitServerCrash(serverName, true); + if(pid <= 0) { + return false; + } else { + // Tell our listeners that a server was removed + if (!this.listeners.isEmpty()) { + for (ServerListener listener : this.listeners) { + listener.serverRemoved(serverName); + } } + // trigger a persist of flushedSeqId + if (flushedSeqIdFlusher != null) { + flushedSeqIdFlusher.triggerNow(); + } + return true; } - // trigger a persist of flushedSeqId - if (flushedSeqIdFlusher != null) { - flushedSeqIdFlusher.triggerNow(); - } - return true; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index a564ea9..b7c2203 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1343,24 +1343,36 @@ public class AssignmentManager { public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { boolean carryingMeta; long pid; - ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + ServerStateNode serverNode = regionStates.getServerNode(serverName); + if(serverNode == null){ + LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName); + return -1; + } // we hold the write lock here for fencing on reportRegionStateTransition. Once we set the // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from // this server. This is used to simplify the implementation for TRSP and SCP, where we can make // sure that, the region list fetched by SCP will not be changed any more. serverNode.writeLock().lock(); try { - serverNode.setState(ServerState.CRASHED); - carryingMeta = isCarryingMeta(serverName); ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); - pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, - shouldSplitWal, carryingMeta)); + carryingMeta = isCarryingMeta(serverName); + if (!serverNode.isInState(ServerState.ONLINE)) { + LOG.info( + "Skip to add SCP for {} with meta= {}, " + + "since there should be a SCP is processing or already done for this server node", + serverName, carryingMeta); + return -1; + } else { + serverNode.setState(ServerState.CRASHED); + pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), + serverName, shouldSplitWal, carryingMeta)); + LOG.info( + "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", + serverName, carryingMeta, pid); + } } finally { serverNode.writeLock().unlock(); } - LOG.info( - "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", - serverName, carryingMeta, pid); return pid; } http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index 7b85409..1470a5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -738,7 +738,8 @@ public class RegionStates { serverMap.remove(serverName); } - ServerStateNode getServerNode(final ServerName serverName) { + @VisibleForTesting + public ServerStateNode getServerNode(final ServerName serverName) { return serverMap.get(serverName); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java index 3efe6e2..c86a60e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java @@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience; * Server State. */ @InterfaceAudience.Private -enum ServerState { +public enum ServerState { /** * Initial state. Available. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java index 6f763aa..11883db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java @@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience; * State of Server; list of hosted regions, etc. */ @InterfaceAudience.Private -class ServerStateNode implements Comparable<ServerStateNode> { +public class ServerStateNode implements Comparable<ServerStateNode> { private final Set<RegionStateNode> regions; private final ServerName serverName; http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index b93f8fa..05bcd28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -333,17 +333,6 @@ public class ServerCrashProcedure return ServerOperationType.CRASH_HANDLER; } - /** - * For this procedure, yield at end of each successful flow step so that all crashed servers - * can make progress rather than do the default which has each procedure running to completion - * before we move to the next. For crashed servers, especially if running with distributed log - * replay, we will want all servers to come along; we do not want the scenario where a server is - * stuck waiting for regions to online so it can replay edits. - */ - @Override - protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) { - return true; - } @Override protected boolean shouldWaitClientAck(MasterProcedureEnv env) { @@ -390,4 +379,9 @@ public class ServerCrashProcedure protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics(); } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 0cd5a22..7bfbfe1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1187,6 +1187,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * @param servers number of region servers */ public void restartHBaseCluster(int servers) throws IOException, InterruptedException { + this.restartHBaseCluster(servers, null); + } + + public void restartHBaseCluster(int servers, List<Integer> ports) + throws IOException, InterruptedException { if (hbaseAdmin != null) { hbaseAdmin.close(); hbaseAdmin = null; @@ -1195,7 +1200,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { this.connection.close(); this.connection = null; } - this.hbaseCluster = new MiniHBaseCluster(this.conf, servers); + this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null); // Don't leave here till we've done a successful scan of the hbase:meta Connection conn = ConnectionFactory.createConnection(this.conf); Table t = conn.getTable(TableName.META_TABLE_NAME); http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index 4ba1876..e55e375 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -33,12 +35,18 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.master.assignment.ServerStateNode; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -67,6 +75,63 @@ public class TestRestartCluster { } @Test + public void testClusterRestartFailOver() throws Exception { + UTIL.startMiniCluster(3); + UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized()); + //wait for all SCPs finished + UTIL.waitFor(20000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .noneMatch(p -> p instanceof ServerCrashProcedure)); + TableName tableName = TABLES[0]; + ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + ServerStateNode serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates().getServerNode(testServer); + Assert.assertNotNull(serverNode); + Assert.assertTrue("serverNode should be ONLINE when cluster runs normally", + serverNode.isInState(ServerState.ONLINE)); + UTIL.createMultiRegionTable(tableName, FAMILY); + UTIL.waitTableEnabled(tableName); + Table table = UTIL.getConnection().getTable(tableName); + for (int i = 0; i < 100; i++) { + UTIL.loadTable(table, FAMILY); + } + List<Integer> ports = + UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream() + .map(serverName -> serverName.getPort()).collect(Collectors.toList()); + LOG.info("Shutting down cluster"); + UTIL.getHBaseCluster().killAll(); + UTIL.getHBaseCluster().waitUntilShutDown(); + LOG.info("Starting cluster the second time"); + UTIL.restartHBaseCluster(3, ports); + UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized()); + serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getServerNode(testServer); + Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode); + Assert.assertFalse(serverNode.isInState(ServerState.ONLINE)); + LOG.info("start to find the procedure of SCP for the severName we choose"); + UTIL.waitFor(20000, + () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .anyMatch(procedure -> (procedure instanceof ServerCrashProcedure) + && ((ServerCrashProcedure) procedure).getServerName().equals(testServer))); + Assert.assertFalse("serverNode should not be ONLINE during SCP processing", + serverNode.isInState(ServerState.ONLINE)); + LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer); + Assert.assertFalse( + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer)); + Procedure procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> (p instanceof ServerCrashProcedure) + && ((ServerCrashProcedure) p).getServerName().equals(testServer)) + .findAny().get(); + UTIL.waitFor(20000, () -> procedure.isFinished()); + LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}", + testServer); + Assert.assertFalse( + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer)); + serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getServerNode(testServer); + Assert.assertNull("serverNode should be deleted after SCP finished", serverNode); + } + + @Test public void testClusterRestart() throws Exception { UTIL.startMiniCluster(3); while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index 0e4a84b..af2076e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; @@ -170,6 +171,43 @@ public class TestServerCrashProcedure { } } + @Test + public void testConcurrentSCPForSameServer() throws Exception { + final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer"); + try (Table t = createTable(tableName)) { + // Load the table with a bit of data so some logs to split and some edits in each region. + this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]); + final int count = util.countRows(t); + assertTrue("expected some rows", count > 0); + // find the first server that match the request and executes the test + ServerName rsToKill = null; + for (RegionInfo hri : util.getAdmin().getRegions(tableName)) { + final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri); + if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == true) { + rsToKill = serverName; + break; + } + } + HMaster master = util.getHBaseCluster().getMaster(); + final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor(); + ServerCrashProcedure procB = + new ServerCrashProcedure(pExecutor.getEnvironment(), rsToKill, false, false); + AssignmentTestingUtil.killRs(util, rsToKill); + long procId = getSCPProcId(pExecutor); + Procedure procA = pExecutor.getProcedure(procId); + LOG.info("submit SCP procedureA"); + util.waitFor(5000, () -> procA.hasLock()); + LOG.info("procedureA acquired the lock"); + assertEquals(Procedure.LockState.LOCK_EVENT_WAIT, + procB.acquireLock(pExecutor.getEnvironment())); + LOG.info("procedureB should not be able to get the lock"); + util.waitFor(60000, + () -> procB.acquireLock(pExecutor.getEnvironment()) == Procedure.LockState.LOCK_ACQUIRED); + LOG.info("when procedure B get the lock, procedure A should be finished"); + assertTrue(procA.isFinished()); + } + } + protected void assertReplicaDistributed(final Table t) { return; }