HBASE-20727 Persist FlushedSequenceId to speed up WAL split after cluster restart
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b336da92 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b336da92 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b336da92 Branch: refs/heads/HBASE-19064 Commit: b336da925ac5c5ee3565112de4b808fe2eed08a2 Parents: 78da0e3 Author: Allan Yang <allan...@apache.org> Authored: Tue Jun 19 09:45:47 2018 +0800 Committer: Allan Yang <allan...@163.com> Committed: Tue Jun 19 09:45:47 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/HBase.proto | 15 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 8 + .../hadoop/hbase/master/ServerManager.java | 213 ++++++++++++++++++- .../apache/hadoop/hbase/master/TestMaster.java | 33 +++ 4 files changed, 268 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-protocol-shaded/src/main/protobuf/HBase.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto index 29067f1..0af2ffd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto +++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto @@ -252,4 +252,19 @@ message CacheEvictionStats { optional int64 bytes_evicted = 2; optional int64 max_cache_size = 3; repeated RegionExceptionMessage exception = 4; +} + +message FlushedStoreSequenceId { + required bytes family = 1; + required uint64 seqId = 2; +} + +message FlushedRegionSequenceId { + required bytes regionEncodedName = 1; + required uint64 seqId = 2; + repeated FlushedStoreSequenceId stores = 3; +} + +message FlushedSequenceId { + repeated FlushedRegionSequenceId regionSequenceId = 1; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 883bb4f..38aac50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -862,6 +862,13 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Initializing ZK system trackers"); initializeZKBasedSystemTrackers(); + status.setStatus("Loading last flushed sequence id of regions"); + try { + this.serverManager.loadLastFlushedSequenceIds(); + } catch (IOException e) { + LOG.debug("Failed to load last flushed sequence id of regions" + + " from file system", e); + } // Set ourselves as active Master now our claim has succeeded up in zk. this.activeMaster = true; @@ -946,6 +953,7 @@ public class HMaster extends HRegionServer implements MasterServices { getChoreService().scheduleChore(normalizerChore); this.catalogJanitorChore = new CatalogJanitor(this); getChoreService().scheduleChore(catalogJanitorChore); + this.serverManager.startChore(); status.setStatus("Starting cluster schema service"); initClusterSchemaService(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index c746502..cfbd52f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -38,10 +38,15 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; @@ -51,9 +56,11 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -62,12 +69,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; /** @@ -106,6 +117,22 @@ public class ServerManager { public static final String WAIT_ON_REGIONSERVERS_INTERVAL = "hbase.master.wait.on.regionservers.interval"; + /** + * see HBASE-20727 + * if set to true, flushedSequenceIdByRegion and storeFlushedSequenceIdsByRegion + * will be persisted to HDFS and loaded when master restart to speed up log split + */ + public static final String PERSIST_FLUSHEDSEQUENCEID = + "hbase.master.persist.flushedsequenceid.enabled"; + + public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true; + + public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL = + "hbase.master.flushedsequenceid.flusher.interval"; + + public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = + 3 * 60 * 60 * 1000; // 3 hours + private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class); // Set if we are to shutdown the cluster. @@ -117,6 +144,13 @@ public class ServerManager { private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + private boolean persistFlushedSequenceId = true; + private volatile boolean isFlushSeqIdPersistInProgress = false; + /** File on hdfs to store last flushed sequence id of regions */ + private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids"; + private FlushedSequenceIdFlusher flushedSeqIdFlusher; + + /** * The last flushed sequence id for a store in a region. */ @@ -194,6 +228,8 @@ public class ServerManager { warningSkew = c.getLong("hbase.master.warningclockskew", 10000); this.connection = master.getClusterConnection(); this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory(); + persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, + PERSIST_FLUSHEDSEQUENCEID_DEFAULT); } /** @@ -424,6 +460,11 @@ public class ServerManager { this.rsAdmins.remove(serverName); } + @VisibleForTesting + public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() { + return flushedSequenceIdByRegion; + } + public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) { RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder(); Long seqId = flushedSequenceIdByRegion.get(encodedRegionName); @@ -601,6 +642,10 @@ public class ServerManager { listener.serverRemoved(serverName); } } + // trigger a persist of flushedSeqId + if (flushedSeqIdFlusher != null) { + flushedSeqIdFlusher.triggerNow(); + } return true; } @@ -968,10 +1013,36 @@ public class ServerManager { } /** + * start chore in ServerManager + */ + public void startChore() { + Configuration c = master.getConfiguration(); + if (persistFlushedSequenceId) { + // when reach here, RegionStates should loaded, firstly, we call remove deleted regions + removeDeletedRegionFromLoadedFlushedSequenceIds(); + int flushPeriod = c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, + FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT); + flushedSeqIdFlusher = new FlushedSequenceIdFlusher( + "FlushedSequenceIdFlusher", flushPeriod); + master.getChoreService().scheduleChore(flushedSeqIdFlusher); + } + } + + /** * Stop the ServerManager. */ public void stop() { - // Nothing to do. + if (flushedSeqIdFlusher != null) { + flushedSeqIdFlusher.cancel(); + } + if (persistFlushedSequenceId) { + try { + persistRegionLastFlushedSequenceIds(); + } catch (IOException e) { + LOG.warn("Failed to persist last flushed sequence id of regions" + + " to file system", e); + } + } } /** @@ -1065,4 +1136,144 @@ public class ServerManager { ServerMetrics serverMetrics = onlineServers.get(serverName); return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0; } + + /** + * Persist last flushed sequence id of each region to HDFS + * @throws IOException if persit to HDFS fails + */ + private void persistRegionLastFlushedSequenceIds() throws IOException { + if (isFlushSeqIdPersistInProgress) { + return; + } + isFlushSeqIdPersistInProgress = true; + try { + Configuration conf = master.getConfiguration(); + Path rootDir = FSUtils.getRootDir(conf); + Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(lastFlushedSeqIdPath)) { + LOG.info("Rewriting .lastflushedseqids file at: " + + lastFlushedSeqIdPath); + if (!fs.delete(lastFlushedSeqIdPath, false)) { + throw new IOException("Unable to remove existing " + + lastFlushedSeqIdPath); + } + } else { + LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath); + } + FSDataOutputStream out = fs.create(lastFlushedSeqIdPath); + FlushedSequenceId.Builder flushedSequenceIdBuilder = + FlushedSequenceId.newBuilder(); + try { + for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) { + FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder = + FlushedRegionSequenceId.newBuilder(); + flushedRegionSequenceIdBuilder.setRegionEncodedName( + ByteString.copyFrom(entry.getKey())); + flushedRegionSequenceIdBuilder.setSeqId(entry.getValue()); + ConcurrentNavigableMap<byte[], Long> storeSeqIds = + storeFlushedSequenceIdsByRegion.get(entry.getKey()); + if (storeSeqIds != null) { + for (Entry<byte[], Long> store : storeSeqIds.entrySet()) { + FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder = + FlushedStoreSequenceId.newBuilder(); + flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey())); + flushedStoreSequenceIdBuilder.setSeqId(store.getValue()); + flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder); + } + } + flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder); + } + flushedSequenceIdBuilder.build().writeDelimitedTo(out); + } finally { + if (out != null) { + out.close(); + } + } + } finally { + isFlushSeqIdPersistInProgress = false; + } + } + + /** + * Load last flushed sequence id of each region from HDFS, if persisted + */ + public void loadLastFlushedSequenceIds() throws IOException { + if (!persistFlushedSequenceId) { + return; + } + Configuration conf = master.getConfiguration(); + Path rootDir = FSUtils.getRootDir(conf); + Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE); + FileSystem fs = FileSystem.get(conf); + if (!fs.exists(lastFlushedSeqIdPath)) { + LOG.info("No .lastflushedseqids found at" + lastFlushedSeqIdPath + + " will record last flushed sequence id" + + " for regions by regionserver report all over again"); + return; + } else { + LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath); + } + FSDataInputStream in = fs.open(lastFlushedSeqIdPath); + try { + FlushedSequenceId flushedSequenceId = + FlushedSequenceId.parseDelimitedFrom(in); + for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId + .getRegionSequenceIdList()) { + byte[] encodedRegionName = flushedRegionSequenceId + .getRegionEncodedName().toByteArray(); + flushedSequenceIdByRegion + .putIfAbsent(encodedRegionName, flushedRegionSequenceId.getSeqId()); + if (flushedRegionSequenceId.getStoresList() != null + && flushedRegionSequenceId.getStoresList().size() != 0) { + ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId = + computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName, + () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + for (FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId + .getStoresList()) { + storeFlushedSequenceId + .put(flushedStoreSequenceId.getFamily().toByteArray(), + flushedStoreSequenceId.getSeqId()); + } + } + } + } finally { + in.close(); + } + } + + /** + * Regions may have been removed between latest persist of FlushedSequenceIds + * and master abort. So after loading FlushedSequenceIds from file, and after + * meta loaded, we need to remove the deleted region according to RegionStates. + */ + public void removeDeletedRegionFromLoadedFlushedSequenceIds() { + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator(); + while(it.hasNext()) { + byte[] regionEncodedName = it.next(); + if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) == null) { + it.remove(); + storeFlushedSequenceIdsByRegion.remove(regionEncodedName); + } + } + } + + + private class FlushedSequenceIdFlusher extends ScheduledChore { + + public FlushedSequenceIdFlusher(String name, int p) { + super(name, master, p, 60 * 1000); //delay one minute before first execute + } + + @Override + protected void chore() { + try { + persistRegionLastFlushedSequenceIds(); + } catch (IOException e) { + LOG.debug("Failed to persist last flushed sequence id of regions" + + " to file system", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 11df313..8abaf60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -24,6 +24,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -192,5 +195,35 @@ public class TestMaster { TEST_UTIL.deleteTable(tableName); } } + + @Test + public void testFlushedSequenceIdPersistLoad() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + int msgInterval = conf.getInt("hbase.regionserver.msginterval", 100); + // insert some data into META + TableName tableName = TableName.valueOf("testFlushSeqId"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(Bytes.toBytes("cf"))); + Table table = TEST_UTIL.createTable(desc, null); + // flush META region + TEST_UTIL.flush(TableName.META_TABLE_NAME); + // wait for regionserver report + Threads.sleep(msgInterval * 2); + // record flush seqid before cluster shutdown + Map<byte[], Long> regionMapBefore = + TEST_UTIL.getHBaseCluster().getMaster().getServerManager() + .getFlushedSequenceIdByRegion(); + // restart hbase cluster which will cause flushed sequence id persist and reload + TEST_UTIL.getMiniHBaseCluster().shutdown(); + TEST_UTIL.restartHBaseCluster(2); + TEST_UTIL.waitUntilNoRegionsInTransition(); + // check equality after reloading flushed sequence id map + Map<byte[], Long> regionMapAfter = + TEST_UTIL.getHBaseCluster().getMaster().getServerManager() + .getFlushedSequenceIdByRegion(); + assertTrue(regionMapBefore.equals(regionMapAfter)); + + + } }