HBASE-9465 Push entries to peer clusters serially Signed-off-by: zhangduo <zhang...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5cadcd59 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5cadcd59 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5cadcd59 Branch: refs/heads/hbase-12439 Commit: 5cadcd59aa57c9566349dc8551c958dc974e774e Parents: 1ecb0fc Author: Phil Yang <ud1...@gmail.com> Authored: Thu Aug 4 10:11:56 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Aug 9 15:25:50 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HTableDescriptor.java | 12 + .../apache/hadoop/hbase/MetaTableAccessor.java | 234 ++++++++++- .../hbase/client/ConnectionImplementation.java | 1 + .../client/replication/ReplicationAdmin.java | 14 +- .../org/apache/hadoop/hbase/HConstants.java | 26 ++ .../src/main/resources/hbase-default.xml | 14 + .../hbase/protobuf/generated/WALProtos.java | 18 +- hbase-protocol/src/main/protobuf/WAL.proto | 1 + .../org/apache/hadoop/hbase/master/HMaster.java | 5 + .../hadoop/hbase/master/RegionStateStore.java | 47 ++- .../master/cleaner/ReplicationMetaCleaner.java | 186 +++++++++ .../hbase/regionserver/wal/FSWALEntry.java | 1 - .../replication/regionserver/Replication.java | 12 + .../regionserver/ReplicationSource.java | 127 +++++- .../regionserver/ReplicationSourceManager.java | 87 +++- .../hadoop/hbase/util/FSTableDescriptors.java | 24 ++ .../java/org/apache/hadoop/hbase/wal/WAL.java | 16 + .../hadoop/hbase/TestMetaTableAccessor.java | 8 +- .../master/TestAssignmentManagerOnCluster.java | 2 +- .../replication/TestSerialReplication.java | 399 +++++++++++++++++++ 20 files changed, 1176 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index ccad414..9abdf42 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -1115,6 +1115,18 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { } /** + * Return true if there are at least one cf whose replication scope is serial. + */ + public boolean hasSerialReplicationScope() { + for (HColumnDescriptor column: getFamilies()){ + if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){ + return true; + } + } + return false; + } + + /** * Returns the configured replicas per region */ public int getRegionReplication() { http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index a5dbc94..1eaa753 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hbase; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -34,8 +38,6 @@ import java.util.regex.Pattern; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -113,14 +115,31 @@ public class MetaTableAccessor { * region is the result of a merge * info:mergeB => contains a serialized HRI for the second parent region if the * region is the result of a merge - * * The actual layout of meta should be encapsulated inside MetaTableAccessor methods, * and should not leak out of it (through Result objects, etc) + * + * For replication serially, there are two column families "rep_barrier", "rep_position" whose + * row key is encodedRegionName. + * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence + * id in this region + * rep_position:{peerid} => to save the max sequence id we have pushed for each peer + * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when + * we clean old data + * rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this + * cell the value is merged encoded name or two split encoded names + * separated by "," */ private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class); private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META"); + // Save its daughter region(s) when split/merge + private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_"); + + // Save its table name because we only know region's encoded name + private static final String tableNamePeer = "_TABLENAME_"; + private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer); + static final byte [] META_REGION_PREFIX; static { // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX. @@ -1318,6 +1337,19 @@ public class MetaTableAccessor { return delete; } + public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) { + byte[] seqBytes = Bytes.toBytes(seq); + return new Put(encodedRegionName) + .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes) + .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName); + } + + + public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) { + return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY, + daughterNamePosCq, value); + } + /** * Adds split daughters to the Put */ @@ -1334,27 +1366,28 @@ public class MetaTableAccessor { } /** - * Put the passed <code>p</code> to the <code>hbase:meta</code> table. + * Put the passed <code>puts</code> to the <code>hbase:meta</code> table. + * Non-atomic for multi puts. * @param connection connection we're using - * @param p Put to add to hbase:meta + * @param puts Put to add to hbase:meta * @throws IOException */ - static void putToMetaTable(final Connection connection, final Put p) + static void putToMetaTable(final Connection connection, final Put... puts) throws IOException { - put(getMetaHTable(connection), p); + put(getMetaHTable(connection), Arrays.asList(puts)); } /** * @param t Table to use (will be closed when done). - * @param p put to make + * @param puts puts to make * @throws IOException */ - private static void put(final Table t, final Put p) throws IOException { + private static void put(final Table t, final List<Put> puts) throws IOException { try { if (METALOG.isDebugEnabled()) { - METALOG.debug(mutationToString(p)); + METALOG.debug(mutationsToString(puts)); } - t.put(p); + t.put(puts); } finally { t.close(); } @@ -1490,7 +1523,7 @@ public class MetaTableAccessor { * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this * does not add its daughter's as different rows, but adds information about the daughters * in the same row as the parent. Use - * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)} + * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)} * if you want to do that. * @param meta the Table for META * @param regionInfo region information @@ -1515,7 +1548,7 @@ public class MetaTableAccessor { * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this * does not add its daughter's as different rows, but adds information about the daughters * in the same row as the parent. Use - * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)} + * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)} * if you want to do that. * @param connection connection we're using * @param regionInfo region information @@ -1601,11 +1634,12 @@ public class MetaTableAccessor { * @param regionB * @param sn the location of the region * @param masterSystemTime + * @param saveBarrier true if need save replication barrier in meta, used for serial replication * @throws IOException */ public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication, - long masterSystemTime) + long masterSystemTime, boolean saveBarrier) throws IOException { Table meta = getMetaHTable(connection); try { @@ -1636,7 +1670,17 @@ public class MetaTableAccessor { byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER); - multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB); + Mutation[] mutations; + if (saveBarrier) { + Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(), + Bytes.toBytes(mergedRegion.getEncodedName())); + Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(), + Bytes.toBytes(mergedRegion.getEncodedName())); + mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB }; + } else { + mutations = new Mutation[] { putOfMerged, deleteA, deleteB }; + } + multiMutate(meta, tableRow, mutations); } finally { meta.close(); } @@ -1652,10 +1696,11 @@ public class MetaTableAccessor { * @param splitA Split daughter region A * @param splitB Split daughter region A * @param sn the location of the region + * @param saveBarrier true if need save replication barrier in meta, used for serial replication */ - public static void splitRegion(final Connection connection, - HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, - ServerName sn, int regionReplication) throws IOException { + public static void splitRegion(final Connection connection, HRegionInfo parent, + HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication, + boolean saveBarrier) throws IOException { Table meta = getMetaHTable(connection); try { HRegionInfo copyOfParent = new HRegionInfo(parent); @@ -1680,8 +1725,17 @@ public class MetaTableAccessor { addEmptyLocation(putB, i); } + Mutation[] mutations; + if (saveBarrier) { + Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(), + Bytes + .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName())); + mutations = new Mutation[]{putParent, putA, putB, putBarrier}; + } else { + mutations = new Mutation[]{putParent, putA, putB}; + } byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); - multiMutate(meta, tableRow, putParent, putA, putB); + multiMutate(meta, tableRow, mutations); } finally { meta.close(); } @@ -1781,6 +1835,27 @@ public class MetaTableAccessor { } /** + * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1. + * @param connection connection we're using + * @param peerId the peerId to push + * @param positions map that saving positions for each region + * @throws IOException + */ + public static void updateReplicationPositions(Connection connection, String peerId, + Map<String, Long> positions) throws IOException { + List<Put> puts = new ArrayList<>(); + for (Map.Entry<String, Long> entry : positions.entrySet()) { + long value = Math.abs(entry.getValue()); + Put put = new Put(Bytes.toBytes(entry.getKey())); + put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId), + Bytes.toBytes(value)); + puts.add(put); + } + getMetaHTable(connection).put(puts); + } + + + /** * Updates the location of the specified region to be the specified server. * <p> * Connects to the specified server which should be hosting the specified @@ -1977,4 +2052,125 @@ public class MetaTableAccessor { private static String mutationToString(Mutation p) throws IOException { return p.getClass().getSimpleName() + p.toJSON(); } + + /** + * Get replication position for a peer in a region. + * @param connection connection we're using + * @return the position of this peer, -1 if no position in meta. + */ + public static long getReplicationPositionForOnePeer(Connection connection, + byte[] encodedRegionName, String peerId) throws IOException { + Get get = new Get(encodedRegionName); + get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId)); + Result r = get(getMetaHTable(connection), get); + if (r.isEmpty()) { + return -1; + } + Cell cell = r.rawCells()[0]; + return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); + } + + /** + * Get replication positions for all peers in a region. + * @param connection connection we're using + * @param encodedRegionName region's encoded name + * @return the map of positions for each peer + */ + public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection, + byte[] encodedRegionName) throws IOException { + Get get = new Get(encodedRegionName); + get.addFamily(HConstants.REPLICATION_POSITION_FAMILY); + Result r = get(getMetaHTable(connection), get); + Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1)); + for (Cell c : r.listCells()) { + if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(), + c.getQualifierOffset(), c.getQualifierLength()) && + !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(), + c.getQualifierOffset(), c.getQualifierLength())) { + map.put( + Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()), + Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); + } + } + return map; + } + + /** + * Get replication barriers for all peers in a region. + * @param encodedRegionName region's encoded name + * @return a list of barrier sequence numbers. + * @throws IOException + */ + public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName) + throws IOException { + Get get = new Get(encodedRegionName); + get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + Result r = get(getMetaHTable(connection), get); + List<Long> list = new ArrayList<>(); + if (!r.isEmpty()) { + for (Cell cell : r.rawCells()) { + list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())); + } + } + return list; + } + + /** + * Get all barriers in all regions. + * @return a map of barrier lists in all regions + * @throws IOException + */ + public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException { + Map<String, List<Long>> map = new HashMap<>(); + Scan scan = new Scan(); + scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + try (Table t = getMetaHTable(connection); + ResultScanner scanner = t.getScanner(scan)) { + Result result; + while ((result = scanner.next()) != null) { + String key = Bytes.toString(result.getRow()); + List<Long> list = new ArrayList<>(); + for (Cell cell : result.rawCells()) { + list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())); + } + map.put(key, list); + } + } + return map; + } + + /** + * Get daughter region(s) for a region, only used in serial replication. + * @throws IOException + */ + public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName) + throws IOException { + Get get = new Get(encodedName); + get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq); + Result result = get(getMetaHTable(connection), get); + if (!result.isEmpty()) { + Cell c = result.rawCells()[0]; + return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + return null; + } + + /** + * Get the table name for a region, only used in serial replication. + * @throws IOException + */ + public static String getSerialReplicationTableName(Connection connection, byte[] encodedName) + throws IOException { + Get get = new Get(encodedName); + get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq); + Result result = get(getMetaHTable(connection), get); + if (!result.isEmpty()) { + Cell c = result.rawCells()[0]; + return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 04edd25..37c62c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -721,6 +721,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { Scan s = new Scan(); s.setReversed(true); s.setStartRow(metaKey); + s.addFamily(HConstants.CATALOG_FAMILY); s.setSmall(true); s.setCaching(1); if (this.useMetaReplicas) { http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index dca1821..ee26e38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -92,8 +92,10 @@ public class ReplicationAdmin implements Closeable { // only Global for now, can add other type // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. public static final String REPLICATIONTYPE = "replicationType"; - public static final String REPLICATIONGLOBAL = Integer - .toString(HConstants.REPLICATION_SCOPE_GLOBAL); + public static final String REPLICATIONGLOBAL = + Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL); + public static final String REPLICATIONSERIAL = + Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL); private final Connection connection; // TODO: replication should be managed by master. All the classes except ReplicationAdmin should @@ -430,7 +432,10 @@ public class ReplicationAdmin implements Closeable { HashMap<String, String> replicationEntry = new HashMap<String, String>(); replicationEntry.put(TNAME, tableName); replicationEntry.put(CFNAME, column.getNameAsString()); - replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL); + replicationEntry.put(REPLICATIONTYPE, + column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ? + REPLICATIONGLOBAL : + REPLICATIONSERIAL); replicationColFams.add(replicationEntry); } } @@ -616,7 +621,8 @@ public class ReplicationAdmin implements Closeable { */ private boolean isTableRepEnabled(HTableDescriptor htd) { for (HColumnDescriptor hcd : htd.getFamilies()) { - if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { + if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL + && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { return false; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ce18ef5..4c499a2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -429,6 +429,20 @@ public final class HConstants { /** The catalog family */ public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR); + /** The replication barrier family as a string*/ + public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier"; + + /** The replication barrier family */ + public static final byte [] REPLICATION_BARRIER_FAMILY = + Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR); + + /** The replication barrier family as a string*/ + public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position"; + + /** The replication barrier family */ + public static final byte [] REPLICATION_POSITION_FAMILY = + Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR); + /** The RegionInfo qualifier as a string */ public static final String REGIONINFO_QUALIFIER_STR = "regioninfo"; @@ -636,6 +650,12 @@ public final class HConstants { public static final int REPLICATION_SCOPE_GLOBAL = 1; /** + * Scope tag for serially scoped data + * This data will be replicated to all peers by the order of sequence id. + */ + public static final int REPLICATION_SCOPE_SERIAL = 2; + + /** * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. */ @@ -866,6 +886,12 @@ public final class HConstants { public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; + + public static final String + REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs"; + public static final long + REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; + /** * Directory where the source cluster file system client configuration are placed which is used by * sink cluster to copy HFiles from source cluster file system http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 116c7d9..a791717 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1571,6 +1571,20 @@ possible configurations would overwhelm and obscure the important. slave clusters. The default of 10 will rarely need to be changed. </description> </property> + <property> + <name>hbase.serial.replication.waitingMs</name> + <value>10000</value> + <description> + By default, in replication we can not make sure the order of operations in slave cluster is + same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order + of written. This configure is to set how long (in ms) we will wait before next checking if a + log can not push right now because there are some logs written before it have not been pushed. + A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay + of replication. This feature relies on zk-less assignment, and conflicts with distributed log + replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to + false to support it. + </description> + </property> <!-- Static Web User Filter properties. --> <property> <description> http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 28f4d4b..a675b12 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -21,6 +21,10 @@ public final class WALProtos { * <code>REPLICATION_SCOPE_GLOBAL = 1;</code> */ REPLICATION_SCOPE_GLOBAL(1, 1), + /** + * <code>REPLICATION_SCOPE_SERIAL = 2;</code> + */ + REPLICATION_SCOPE_SERIAL(2, 2), ; /** @@ -31,6 +35,10 @@ public final class WALProtos { * <code>REPLICATION_SCOPE_GLOBAL = 1;</code> */ public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1; + /** + * <code>REPLICATION_SCOPE_SERIAL = 2;</code> + */ + public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2; public final int getNumber() { return value; } @@ -39,6 +47,7 @@ public final class WALProtos { switch (value) { case 0: return REPLICATION_SCOPE_LOCAL; case 1: return REPLICATION_SCOPE_GLOBAL; + case 2: return REPLICATION_SCOPE_SERIAL; default: return null; } } @@ -12013,11 +12022,12 @@ public final class WALProtos { "\030\005 \003(\0132\031.hbase.pb.StoreDescriptor\022$\n\006ser" + "ver\030\006 \001(\0132\024.hbase.pb.ServerName\022\023\n\013regio" + "n_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN" + - "\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tSc" + + "\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*d\n\tSc" + "opeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030" + - "REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache" + - ".hadoop.hbase.protobuf.generatedB\tWALPro" + - "tosH\001\210\001\000\240\001\001" + "REPLICATION_SCOPE_GLOBAL\020\001\022\034\n\030REPLICATIO" + + "N_SCOPE_SERIAL\020\002B?\n*org.apache.hadoop.hb" + + "ase.protobuf.generatedB\tWALProtosH\001\210\001\000\240\001" + + "\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-protocol/src/main/protobuf/WAL.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index c1d465a..2494977 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -75,6 +75,7 @@ message WALKey { enum ScopeType { REPLICATION_SCOPE_LOCAL = 0; REPLICATION_SCOPE_GLOBAL = 1; + REPLICATION_SCOPE_SERIAL = 2; } message FamilyScope { http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 5ce056d..2022c5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; +import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; @@ -311,6 +312,7 @@ public class HMaster extends HRegionServer implements MasterServices { CatalogJanitor catalogJanitorChore; private ReplicationZKLockCleanerChore replicationZKLockCleanerChore; + private ReplicationMetaCleaner replicationMetaCleaner; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ExpiredMobFileCleanerChore expiredMobFileCleanerChore; @@ -988,6 +990,8 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.error("start replicationZKLockCleanerChore failed", e); } } + replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); + getChoreService().scheduleChore(replicationMetaCleaner); } @Override @@ -1022,6 +1026,7 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true); + if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 82e28df..2dbc087 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -17,22 +17,25 @@ */ package org.apache.hadoop.hbase.master; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.RegionState.State; @@ -44,8 +47,6 @@ import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.zookeeper.KeeperException; -import com.google.common.base.Preconditions; - /** * A helper to persist region state in meta. We may change this class * to StateStore later if we also use it to store other states in meta @@ -60,7 +61,7 @@ public class RegionStateStore { private volatile Region metaRegion; private volatile boolean initialized; private MultiHConnection multiHConnection; - private final Server server; + private final MasterServices server; /** * Returns the {@link ServerName} from catalog table {@link Result} @@ -130,7 +131,7 @@ public class RegionStateStore { State.SPLITTING_NEW, State.MERGED)); } - RegionStateStore(final Server server) { + RegionStateStore(final MasterServices server) { this.server = server; initialized = false; } @@ -187,31 +188,41 @@ public class RegionStateStore { State state = newState.getState(); int replicaId = hri.getReplicaId(); - Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri)); + Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri)); StringBuilder info = new StringBuilder("Updating hbase:meta row "); info.append(hri.getRegionNameAsString()).append(" with state=").append(state); if (serverName != null && !serverName.equals(oldServer)) { - put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId), + metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId), Bytes.toBytes(serverName.getServerName())); info.append(", sn=").append(serverName); } if (openSeqNum >= 0) { Preconditions.checkArgument(state == State.OPEN && serverName != null, "Open region should be on a server"); - MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId); + MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId); info.append(", openSeqNum=").append(openSeqNum); info.append(", server=").append(serverName); } - put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId), + metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId), Bytes.toBytes(state.name())); LOG.info(info); - + HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable()); + boolean serial = false; + if (descriptor != null) { + serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope(); + } + boolean shouldPutBarrier = serial && state == State.OPEN; // Persist the state change to meta if (metaRegion != null) { try { // Assume meta is pinned to master. // At least, that's what we want. - metaRegion.put(put); + metaRegion.put(metaPut); + if (shouldPutBarrier) { + Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(), + openSeqNum, hri.getTable().getName()); + metaRegion.put(barrierPut); + } return; // Done here } catch (Throwable t) { // In unit tests, meta could be moved away by intention @@ -230,8 +241,10 @@ public class RegionStateStore { } } // Called when meta is not on master - multiHConnection.processBatchCallback(Arrays.asList(put), - TableName.META_TABLE_NAME, null, null); + List<Put> list = shouldPutBarrier ? + Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(), + openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut); + multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null); } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); @@ -241,12 +254,14 @@ public class RegionStateStore { void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { - MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication); + MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication, + server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope()); } void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication, - EnvironmentEdgeManager.currentTime()); + EnvironmentEdgeManager.currentTime(), + server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java new file mode 100644 index 0000000..e9647e8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This chore is to clean up the useless data in hbase:meta which is used by serial replication. + */ +@InterfaceAudience.Private +public class ReplicationMetaCleaner extends ScheduledChore { + + private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class); + + private ReplicationAdmin replicationAdmin; + private MasterServices master; + + public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period) + throws IOException { + super("ReplicationMetaCleaner", stoppable, period); + this.master = master; + replicationAdmin = new ReplicationAdmin(master.getConfiguration()); + } + + @Override + protected void chore() { + try { + Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAllDescriptors(); + Map<String, Set<String>> serialTables = new HashMap<>(); + for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) { + boolean hasSerialScope = false; + for (HColumnDescriptor column : entry.getValue().getFamilies()) { + if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) { + hasSerialScope = true; + break; + } + } + if (hasSerialScope) { + serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>()); + } + } + if (serialTables.isEmpty()){ + return; + } + + Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs(); + for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) { + for (Map.Entry<TableName, List<String>> map : entry.getValue().getTableCFsMap() + .entrySet()) { + if (serialTables.containsKey(map.getKey().getNameAsString())) { + serialTables.get(map.getKey().getNameAsString()).add(entry.getKey()); + break; + } + } + } + + Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection()); + for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) { + String encodedName = entry.getKey(); + byte[] encodedBytes = Bytes.toBytes(encodedName); + boolean canClearRegion = false; + Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer( + master.getConnection(), encodedBytes); + if (posMap.isEmpty()) { + continue; + } + + String tableName = MetaTableAccessor.getSerialReplicationTableName( + master.getConnection(), encodedBytes); + Set<String> confPeers = serialTables.get(tableName); + if (confPeers == null) { + // This table doesn't exist or all cf's scope is not serial any more, we can clear meta. + canClearRegion = true; + } else { + if (!allPeersHavePosition(confPeers, posMap)) { + continue; + } + + String daughterValue = MetaTableAccessor + .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes); + if (daughterValue != null) { + //this region is merged or split + boolean allDaughterStart = true; + String[] daughterRegions = daughterValue.split(","); + for (String daughter : daughterRegions) { + byte[] region = Bytes.toBytes(daughter); + if (!MetaTableAccessor.getReplicationBarriers( + master.getConnection(), region).isEmpty() && + !allPeersHavePosition(confPeers, + MetaTableAccessor + .getReplicationPositionForAllPeer(master.getConnection(), region))) { + allDaughterStart = false; + break; + } + } + if (allDaughterStart) { + canClearRegion = true; + } + } + } + if (canClearRegion) { + Delete delete = new Delete(encodedBytes); + delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY); + delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { + metaTable.delete(delete); + } + } else { + + // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq + // is smaller than min pos should be kept. All other barriers can be deleted. + + long minPos = Long.MAX_VALUE; + for (Map.Entry<String, Long> pos : posMap.entrySet()) { + minPos = Math.min(minPos, pos.getValue()); + } + List<Long> barriers = entry.getValue(); + int index = Collections.binarySearch(barriers, minPos); + if (index < 0) { + index = -index - 1; + } + Delete delete = new Delete(encodedBytes); + for (int i = 0; i < index - 1; i++) { + delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i))); + } + try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { + metaTable.delete(delete); + } + } + + } + + } catch (IOException e) { + LOG.error("Exception during cleaning up.", e); + } + + } + + private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap) + throws IOException { + for(String peer:peers){ + if (!posMap.containsKey(peer)){ + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 3449832..72474a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -125,7 +125,6 @@ class FSWALEntry extends Entry { CellUtil.setSequenceId(c, regionSequenceId); } } - getKey().setWriteEntry(we); return regionSequenceId; } http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 4f518bb..741065a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; @@ -279,6 +280,17 @@ public class Replication extends WALActionsListener.Base implements for (Cell cell : logEdit.getCells()) { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { foundOtherEdits = true; + break; + } + } + + if (!foundOtherEdits && logEdit.getCells().size() > 0) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); + if (maybeEvent != null && (maybeEvent.getEventType() == + WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { + // In serially replication, we use scopes when reading close marker. + foundOtherEdits = true; } } if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2f3b2a8..ce0fb06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; @@ -29,8 +33,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -48,9 +54,11 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -102,6 +110,8 @@ public class ReplicationSource extends Thread private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; + + String actualPeerId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; // Should we stop everything? @@ -185,6 +195,8 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + this.actualPeerId = replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; } @@ -507,6 +519,17 @@ public class ReplicationSource extends Thread // Current number of hfiles that we need to replicate private long currentNbHFiles = 0; + // Use guava cache to set ttl for each key + private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.DAYS).build( + new CacheLoader<String, Boolean>() { + @Override + public Boolean load(String key) throws Exception { + return false; + } + } + ); + public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { @@ -588,9 +611,24 @@ public class ReplicationSource extends Thread currentNbOperations = 0; currentNbHFiles = 0; List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1); + + Map<String, Long> lastPositionsForSerialScope = new HashMap<>(); currentSize = 0; try { - if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) { + if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries, + lastPositionsForSerialScope)) { + for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) { + waitingUntilCanPush(entry); + } + try { + MetaTableAccessor + .updateReplicationPositions(manager.getConnection(), actualPeerId, + lastPositionsForSerialScope); + } catch (IOException e) { + LOG.error("updateReplicationPositions fail", e); + stopper.stop("updateReplicationPositions fail"); + } + continue; } } catch (IOException ioe) { @@ -626,15 +664,30 @@ public class ReplicationSource extends Thread LOG.warn("Unable to finalize the tailing of a file", e); } } - + for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) { + waitingUntilCanPush(entry); + } // If we didn't get anything to replicate, or if we hit a IOE, // wait a bit and retry. // But if we need to stop, don't bother sleeping if (isWorkerActive() && (gotIOE || entries.isEmpty())) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) { - manager.logPositionAndCleanOldLogs(this.currentPath, - peerClusterZnode, this.repLogReader.getPosition(), + + // Save positions to meta table before zk. + if (!gotIOE) { + try { + MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId, + lastPositionsForSerialScope); + } catch (IOException e) { + LOG.error("updateReplicationPositions fail", e); + stopper.stop("updateReplicationPositions fail"); + } + } + + manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, + this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); + this.lastLoggedPosition = this.repLogReader.getPosition(); } // Reset the sleep multiplier if nothing has actually gone wrong @@ -649,8 +702,7 @@ public class ReplicationSource extends Thread } continue; } - sleepMultiplier = 1; - shipEdits(currentWALisBeingWrittenTo, entries); + shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); } if (replicationQueueInfo.isQueueRecovered()) { // use synchronize to make sure one last thread will clean the queue @@ -672,16 +724,42 @@ public class ReplicationSource extends Thread } } + private void waitingUntilCanPush(Map.Entry<String, Long> entry) { + String key = entry.getKey(); + long seq = entry.getValue(); + boolean deleteKey = false; + if (seq <= 0) { + // There is a REGION_CLOSE marker, we can not continue skipping after this entry. + deleteKey = true; + seq = -seq; + } + + if (!canSkipWaitingSet.getUnchecked(key)) { + try { + manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId); + } catch (Exception e) { + LOG.error("waitUntilCanBePushed fail", e); + stopper.stop("waitUntilCanBePushed fail"); + } + canSkipWaitingSet.put(key, true); + } + if (deleteKey) { + canSkipWaitingSet.invalidate(key); + } + } + /** * Read all the entries from the current log files and retain those that need to be replicated. * Else, process the end of the current file. * @param currentWALisBeingWrittenTo is the current WAL being written to * @param entries resulting entries to be replicated + * @param lastPosition save the last sequenceid for each region if the table has + * serial-replication scope * @return true if we got nothing and went to the next file, false if we got entries * @throws IOException */ protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, - List<WAL.Entry> entries) throws IOException { + List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException { long seenEntries = 0; if (LOG.isTraceEnabled()) { LOG.trace("Seeking in " + this.currentPath + " at position " @@ -694,6 +772,27 @@ public class ReplicationSource extends Thread metrics.incrLogEditsRead(); seenEntries++; + if (entry.hasSerialReplicationScope()) { + String key = Bytes.toString(entry.getKey().getEncodedRegionName()); + lastPosition.put(key, entry.getKey().getSequenceId()); + if (entry.getEdit().getCells().size() > 0) { + WALProtos.RegionEventDescriptor maybeEvent = + WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); + if (maybeEvent != null && maybeEvent.getEventType() + == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) { + // In serially replication, if we move a region to another RS and move it back, we may + // read logs crossing two sections. We should break at REGION_CLOSE and push the first + // section first in case of missing the middle section belonging to the other RS. + // In a worker thread, if we can push the first log of a region, we can push all logs + // in the same region without waiting until we read a close marker because next time + // we read logs in this region, it must be a new section and not adjacent with this + // region. Mark it negative. + lastPosition.put(key, -entry.getKey().getSequenceId()); + break; + } + } + } + // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { @@ -723,6 +822,7 @@ public class ReplicationSource extends Thread || entries.size() >= replicationQueueNbCapacity) { break; } + try { entry = this.repLogReader.readNextAndSetPosition(); } catch (IOException ie) { @@ -995,7 +1095,8 @@ public class ReplicationSource extends Thread * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * written to when this method was called */ - protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) { + protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries, + Map<String, Long> lastPositionsForSerialScope) { int sleepMultiplier = 0; if (entries.isEmpty()) { LOG.warn("Was given 0 edits to ship"); @@ -1046,6 +1147,16 @@ public class ReplicationSource extends Thread for (int i = 0; i < size; i++) { cleanUpHFileRefs(entries.get(i).getEdit()); } + + // Save positions to meta table before zk. + try { + MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId, + lastPositionsForSerialScope); + } catch (IOException e) { + LOG.error("updateReplicationPositions fail", e); + stopper.stop("updateReplicationPositions fail"); + } + //Log and clean up WAL logs manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 3cb7a84..a6f1891 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -48,10 +49,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -64,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; /** @@ -118,6 +123,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; private final boolean replicationForBulkLoadDataEnabled; + private Connection connection; + private long replicationWaitTime; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -134,7 +141,7 @@ public class ReplicationSourceManager implements ReplicationListener { public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Server server, final FileSystem fs, final Path logDir, - final Path oldLogDir, final UUID clusterId) { + final Path oldLogDir, final UUID clusterId) throws IOException { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); @@ -171,6 +178,9 @@ public class ReplicationSourceManager implements ReplicationListener { replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, + HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT); + connection = ConnectionFactory.createConnection(conf); } /** @@ -782,6 +792,10 @@ public class ReplicationSourceManager implements ReplicationListener { return this.fs; } + public Connection getConnection() { + return this.connection; + } + /** * Get the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager @@ -814,4 +828,75 @@ public class ReplicationSourceManager implements ReplicationListener { public void cleanUpHFileRefs(String peerId, List<String> files) { this.replicationQueues.removeHFileRefs(peerId, files); } + + /** + * Whether an entry can be pushed to the peer or not right now. + * If we enable serial replication, we can not push the entry until all entries in its region + * whose sequence numbers are smaller than this entry have been pushed. + * For each ReplicationSource, we need only check the first entry in each region, as long as it + * can be pushed, we can push all in this ReplicationSource. + * This method will be blocked until we can push. + * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to + * prevent saving positions in the region of no barrier. + */ + void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId) + throws IOException, InterruptedException { + + /** + * There are barriers for this region and position for this peer. N barriers form N intervals, + * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than + * the first barrier and the last interval is start from the last barrier. + * + * There are several conditions that we can push now, otherwise we should block: + * 1) "Serial replication" is not enabled, we can push all logs just like before. This case + * should not call this method. + * 2) There is no barriers for this region, or the seq id is smaller than the first barrier. + * It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the + * order of logs that is written before altering. + * 3) This entry is in the first interval of barriers. We can push them because it is the + * start of a region. Splitting/merging regions are also ok because the first section of + * daughter region is in same region of parents and the order in one RS is guaranteed. + * 4) If the entry's seq id and the position are in same section, or the pos is the last + * number of previous section. Because when open a region we put a barrier the number + * is the last log's id + 1. + * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes + * after save replication meta and before save zk offset. + */ + List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName); + if (barriers.isEmpty() || seq <= barriers.get(0)) { + // Case 2 + return; + } + int interval = Collections.binarySearch(barriers, seq); + if (interval < 0) { + interval = -interval - 1;// get the insert position if negative + } + if (interval == 1) { + // Case 3 + return; + } + + while (true) { + long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId); + if (seq <= pos) { + // Case 5 + } + if (pos >= 0) { + // Case 4 + int posInterval = Collections.binarySearch(barriers, pos); + if (posInterval < 0) { + posInterval = -posInterval - 1;// get the insert position if negative + } + if (posInterval == interval || pos == barriers.get(interval - 1) - 1) { + return; + } + } + + LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId + + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos + + " barriers=" + Arrays.toString(barriers.toArray())); + Thread.sleep(replicationWaitTime); + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index 1c59a44..81dadd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -144,6 +144,30 @@ public class FSTableDescriptors implements TableDescriptors { // Enable cache of data blocks in L1 if more than one caching tier deployed: // e.g. if using CombinedBlockCache (BucketCache). .setCacheDataInL1(true), + new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY) + .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, + HConstants.DEFAULT_HBASE_META_VERSIONS)) + .setInMemory(true) + .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, + HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. + .setBloomFilterType(BloomType.NONE) + // Enable cache of data blocks in L1 if more than one caching tier deployed: + // e.g. if using CombinedBlockCache (BucketCache). + .setCacheDataInL1(true), + new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY) + .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, + HConstants.DEFAULT_HBASE_META_VERSIONS)) + .setInMemory(true) + .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, + HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. + .setBloomFilterType(BloomType.NONE) + // Enable cache of data blocks in L1 if more than one caching tier deployed: + // e.g. if using CombinedBlockCache (BucketCache). + .setCacheDataInL1(true), new HColumnDescriptor(HConstants.TABLE_FAMILY) // Ten is arbitrary number. Keep versions to help debugging. .setMaxVersions(10) http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index af63b0b..79321b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -22,8 +22,11 @@ package org.apache.hadoop.hbase.wal; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides @@ -282,6 +286,18 @@ public interface WAL { key.setCompressionContext(compressionContext); } + public boolean hasSerialReplicationScope () { + if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) { + return false; + } + for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) { + if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){ + return true; + } + } + return false; + } + @Override public String toString() { return this.key + "=" + this.edit; http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 8b84452..d750faf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -452,7 +452,7 @@ public class TestMetaTableAccessor { List<HRegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false); assertEmptyMetaLocation(meta, splitA.getRegionName(), 1); assertEmptyMetaLocation(meta, splitA.getRegionName(), 2); @@ -481,7 +481,7 @@ public class TestMetaTableAccessor { MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3, - HConstants.LATEST_TIMESTAMP); + HConstants.LATEST_TIMESTAMP, false); assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 2); @@ -609,7 +609,7 @@ public class TestMetaTableAccessor { // now merge the regions, effectively deleting the rows for region a and b. MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, - regionInfoA, regionInfoB, sn, 1, masterSystemTime); + regionInfoA, regionInfoB, sn, 1, masterSystemTime, false); result = meta.get(get); serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, @@ -692,7 +692,7 @@ public class TestMetaTableAccessor { } SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); long prevCalls = scheduler.numPriorityCalls; - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false); assertTrue(prevCalls < scheduler.numPriorityCalls); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 7d3d2e9..c15ccf4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -1211,7 +1211,7 @@ public class TestAssignmentManagerOnCluster { public void testUpdatesRemoteMeta() throws Exception { conf.setInt("hbase.regionstatestore.meta.connection", 3); final RegionStateStore rss = - new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager())); + new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager())); rss.start(); // Create 10 threads and make each do 10 puts related to region state update Thread[] th = new Thread[10];