Revert "HBASE-9465 Push entries to peer clusters serially" This reverts commit 441bc050b991c14c048617bc443b97f46e21b76f.
Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba7a936f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba7a936f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba7a936f Branch: refs/heads/branch-1 Commit: ba7a936f74985eb9d974fdc87b0d06cb8cd8473d Parents: 0a284d2 Author: Sean Busbey <bus...@apache.org> Authored: Tue Nov 7 23:50:35 2017 -0600 Committer: zhangduo <zhang...@apache.org> Committed: Fri Feb 23 14:42:15 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HTableDescriptor.java | 46 +-- .../apache/hadoop/hbase/MetaTableAccessor.java | 243 ++--------- .../client/replication/ReplicationAdmin.java | 14 +- .../org/apache/hadoop/hbase/HConstants.java | 26 -- .../src/main/resources/hbase-default.xml | 14 - .../hbase/protobuf/generated/WALProtos.java | 16 +- hbase-protocol/src/main/protobuf/WAL.proto | 1 - .../org/apache/hadoop/hbase/master/HMaster.java | 9 - .../hadoop/hbase/master/RegionStateStore.java | 43 +- .../master/cleaner/ReplicationMetaCleaner.java | 187 --------- .../RegionMergeTransactionImpl.java | 3 +- .../hbase/regionserver/ReplicationService.java | 1 - .../regionserver/SplitTransactionImpl.java | 2 +- .../replication/regionserver/Replication.java | 14 +- .../regionserver/ReplicationSource.java | 68 +--- .../regionserver/ReplicationSourceManager.java | 87 +--- .../ReplicationSourceWALReaderThread.java | 31 -- .../java/org/apache/hadoop/hbase/wal/WAL.java | 13 - .../hadoop/hbase/TestMetaTableAccessor.java | 10 +- .../hadoop/hbase/client/TestMetaScanner.java | 2 +- .../master/TestAssignmentManagerOnCluster.java | 2 +- .../replication/TestSerialReplication.java | 401 ------------------- .../regionserver/TestGlobalThrottler.java | 2 +- 23 files changed, 69 insertions(+), 1166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 1fd950a..7f48976 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -34,12 +34,13 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.regex.Matcher; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -51,7 +52,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableComparable; @@ -1217,18 +1217,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> { } /** - * Return true if there are at least one cf whose replication scope is serial. - */ - public boolean hasSerialReplicationScope() { - for (HColumnDescriptor column: getFamilies()){ - if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){ - return true; - } - } - return false; - } - - /** * Returns the configured replicas per region */ public int getRegionReplication() { @@ -1772,32 +1760,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> { .setScope(HConstants.REPLICATION_SCOPE_LOCAL) // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. .setBloomFilterType(BloomType.NONE) - .setCacheDataInL1(true), - new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY) - .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, - HConstants.DEFAULT_HBASE_META_VERSIONS)) - .setInMemory(true) - .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, - HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. - .setBloomFilterType(BloomType.NONE) - // Enable cache of data blocks in L1 if more than one caching tier deployed: - // e.g. if using CombinedBlockCache (BucketCache). - .setCacheDataInL1(true), - new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY) - .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, - HConstants.DEFAULT_HBASE_META_VERSIONS)) - .setInMemory(true) - .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, - HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. - .setBloomFilterType(BloomType.NONE) - // Enable cache of data blocks in L1 if more than one caching tier deployed: - // e.g. if using CombinedBlockCache (BucketCache). - .setCacheDataInL1(true), - }); + .setCacheDataInL1(true) + }); metaDescriptor.addCoprocessor( "org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint", null, Coprocessor.PRIORITY_SYSTEM, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index c7e3757..3f11558 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -20,20 +20,6 @@ package org.apache.hadoop.hbase; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -63,6 +49,18 @@ import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * Read/write operations on region and assignment information store in * <code>hbase:meta</code>. @@ -109,27 +107,10 @@ public class MetaTableAccessor { * * The actual layout of meta should be encapsulated inside MetaTableAccessor methods, * and should not leak out of it (through Result objects, etc) - * - * For replication serially, there are two column families "rep_barrier", "rep_position" whose - * row key is encodedRegionName. - * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence - * id in this region - * rep_position:{peerid} => to save the max sequence id we have pushed for each peer - * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when - * we clean old data - * rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this - * cell the value is merged encoded name or two split encoded names - * separated by "," */ private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class); - // Save its daughter region(s) when split/merge - private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_"); - // Save its table name because we only know region's encoded name - private static final String tableNamePeer = "_TABLENAME_"; - private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer); - static final byte [] META_REGION_PREFIX; static { // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX. @@ -981,19 +962,6 @@ public class MetaTableAccessor { return delete; } - public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) { - byte[] seqBytes = Bytes.toBytes(seq); - return new Put(encodedRegionName) - .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes) - .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName); - } - - - public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) { - return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY, - daughterNamePosCq, value); - } - /** * Adds split daughters to the Put */ @@ -1010,24 +978,24 @@ public class MetaTableAccessor { } /** - * Put the passed <code>puts</code> to the <code>hbase:meta</code> table. - * Non-atomic for multi puts. + * Put the passed <code>p</code> to the <code>hbase:meta</code> table. * @param connection connection we're using - * @param puts Put to add to hbase:meta + * @param p Put to add to hbase:meta * @throws IOException */ - static void putToMetaTable(final Connection connection, final Put... puts) throws IOException { - put(getMetaHTable(connection), Arrays.asList(puts)); + static void putToMetaTable(final Connection connection, final Put p) + throws IOException { + put(getMetaHTable(connection), p); } /** * @param t Table to use (will be closed when done). - * @param puts puts to make + * @param p put to make * @throws IOException */ - private static void put(final Table t, final List<Put> puts) throws IOException { + private static void put(final Table t, final Put p) throws IOException { try { - t.put(puts); + t.put(p); } finally { t.close(); } @@ -1153,7 +1121,7 @@ public class MetaTableAccessor { * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this * does not add its daughter's as different rows, but adds information about the daughters * in the same row as the parent. Use - * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)} + * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)} * if you want to do that. * @param meta the Table for META * @param regionInfo region information @@ -1175,7 +1143,7 @@ public class MetaTableAccessor { * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this * does not add its daughter's as different rows, but adds information about the daughters * in the same row as the parent. Use - * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)} + * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)} * if you want to do that. * @param connection connection we're using * @param regionInfo region information @@ -1264,7 +1232,7 @@ public class MetaTableAccessor { */ public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication, - long masterSystemTime, boolean saveBarrier) + long masterSystemTime) throws IOException { Table meta = getMetaHTable(connection); try { @@ -1295,17 +1263,7 @@ public class MetaTableAccessor { byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER); - Mutation[] mutations; - if (saveBarrier) { - Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(), - Bytes.toBytes(mergedRegion.getEncodedName())); - Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(), - Bytes.toBytes(mergedRegion.getEncodedName())); - mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB }; - } else { - mutations = new Mutation[] { putOfMerged, deleteA, deleteB }; - } - multiMutate(meta, tableRow, mutations); + multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB); } finally { meta.close(); } @@ -1321,11 +1279,10 @@ public class MetaTableAccessor { * @param splitA Split daughter region A * @param splitB Split daughter region A * @param sn the location of the region - * @param saveBarrier true if need save replication barrier in meta, used for serial replication */ - public static void splitRegion(final Connection connection, HRegionInfo parent, - HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication, - boolean saveBarrier) throws IOException { + public static void splitRegion(final Connection connection, + HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, + ServerName sn, int regionReplication) throws IOException { Table meta = getMetaHTable(connection); try { HRegionInfo copyOfParent = new HRegionInfo(parent); @@ -1350,17 +1307,8 @@ public class MetaTableAccessor { addEmptyLocation(putB, i); } - Mutation[] mutations; - if (saveBarrier) { - Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(), - Bytes - .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName())); - mutations = new Mutation[]{putParent, putA, putB, putBarrier}; - } else { - mutations = new Mutation[]{putParent, putA, putB}; - } byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); - multiMutate(meta, tableRow, mutations); + multiMutate(meta, tableRow, putParent, putA, putB); } finally { meta.close(); } @@ -1418,27 +1366,6 @@ public class MetaTableAccessor { } /** - * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1. - * @param connection connection we're using - * @param peerId the peerId to push - * @param positions map that saving positions for each region - * @throws IOException - */ - public static void updateReplicationPositions(Connection connection, String peerId, - Map<String, Long> positions) throws IOException { - List<Put> puts = new ArrayList<>(); - for (Map.Entry<String, Long> entry : positions.entrySet()) { - long value = Math.abs(entry.getValue()); - Put put = new Put(Bytes.toBytes(entry.getKey())); - put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId), - Bytes.toBytes(value)); - puts.add(put); - } - getMetaHTable(connection).put(puts); - } - - - /** * Updates the location of the specified region to be the specified server. * <p> * Connects to the specified server which should be hosting the specified @@ -1623,120 +1550,6 @@ public class MetaTableAccessor { } /** - * Get replication position for a peer in a region. - * @param connection connection we're using - * @return the position of this peer, -1 if no position in meta. - */ - public static long getReplicationPositionForOnePeer(Connection connection, - byte[] encodedRegionName, String peerId) throws IOException { - Get get = new Get(encodedRegionName); - get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId)); - Result r = get(getMetaHTable(connection), get); - if (r.isEmpty()) { - return -1; - } - Cell cell = r.rawCells()[0]; - return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); - } - - /** - * Get replication positions for all peers in a region. - * @param connection connection we're using - * @param encodedRegionName region's encoded name - * @return the map of positions for each peer - */ - public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection, - byte[] encodedRegionName) throws IOException { - Get get = new Get(encodedRegionName); - get.addFamily(HConstants.REPLICATION_POSITION_FAMILY); - Result r = get(getMetaHTable(connection), get); - Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1)); - for (Cell c : r.listCells()) { - if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(), - c.getQualifierOffset(), c.getQualifierLength()) && - !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(), - c.getQualifierOffset(), c.getQualifierLength())) { - map.put( - Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()), - Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); - } - } - return map; - } - - /** - * Get all barriers in all regions. - * @return a map of barrier lists in all regions - * @throws IOException - */ - public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName) - throws IOException { - Get get = new Get(encodedRegionName); - get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); - Result r = get(getMetaHTable(connection), get); - List<Long> list = new ArrayList<>(); - if (!r.isEmpty()) { - for (Cell cell : r.rawCells()) { - list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength())); - } - } - return list; - } - - public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException { - Map<String, List<Long>> map = new HashMap<>(); - Scan scan = new Scan(); - scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); - try (Table t = getMetaHTable(connection); - ResultScanner scanner = t.getScanner(scan)) { - Result result; - while ((result = scanner.next()) != null) { - String key = Bytes.toString(result.getRow()); - List<Long> list = new ArrayList<>(); - for (Cell cell : result.rawCells()) { - list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength())); - } - map.put(key, list); - } - } - return map; - } - - /** - * Get daughter region(s) for a region, only used in serial replication. - * @throws IOException - */ - public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName) - throws IOException { - Get get = new Get(encodedName); - get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq); - Result result = get(getMetaHTable(connection), get); - if (!result.isEmpty()) { - Cell c = result.rawCells()[0]; - return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength()); - } - return null; - } - - /** - * Get the table name for a region, only used in serial replication. - * @throws IOException - */ - public static String getSerialReplicationTableName(Connection connection, byte[] encodedName) - throws IOException { - Get get = new Get(encodedName); - get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq); - Result result = get(getMetaHTable(connection), get); - if (!result.isEmpty()) { - Cell c = result.rawCells()[0]; - return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength()); - } - return null; - } - - /** * Checks whether hbase:meta contains any info:server entry. * @param connection connection we're using * @return true if hbase:meta contains any info:server entry, false if not http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 7bef9ed..55653d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -91,10 +91,8 @@ public class ReplicationAdmin implements Closeable { // only Global for now, can add other type // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. public static final String REPLICATIONTYPE = "replicationType"; - public static final String REPLICATIONGLOBAL = - Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL); - public static final String REPLICATIONSERIAL = - Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL); + public static final String REPLICATIONGLOBAL = Integer + .toString(HConstants.REPLICATION_SCOPE_GLOBAL); private final Connection connection; // TODO: replication should be managed by master. All the classes except ReplicationAdmin should @@ -488,10 +486,7 @@ public class ReplicationAdmin implements Closeable { HashMap<String, String> replicationEntry = new HashMap<String, String>(); replicationEntry.put(TNAME, tableName); replicationEntry.put(CFNAME, column.getNameAsString()); - replicationEntry.put(REPLICATIONTYPE, - column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ? - REPLICATIONGLOBAL : - REPLICATIONSERIAL); + replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL); replicationColFams.add(replicationEntry); } } @@ -703,8 +698,7 @@ public class ReplicationAdmin implements Closeable { boolean hasDisabled = false; for (HColumnDescriptor hcd : htd.getFamilies()) { - if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL - && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { + if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { hasDisabled = true; } else { hasEnabled = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index dc44c77..e702236 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -449,20 +449,6 @@ public final class HConstants { /** The catalog family */ public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR); - /** The replication barrier family as a string*/ - public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier"; - - /** The replication barrier family */ - public static final byte [] REPLICATION_BARRIER_FAMILY = - Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR); - - /** The replication barrier family as a string*/ - public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position"; - - /** The replication barrier family */ - public static final byte [] REPLICATION_POSITION_FAMILY = - Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR); - /** The RegionInfo qualifier as a string */ public static final String REGIONINFO_QUALIFIER_STR = "regioninfo"; @@ -658,12 +644,6 @@ public final class HConstants { public static final int REPLICATION_SCOPE_GLOBAL = 1; /** - * Scope tag for serially scoped data - * This data will be replicated to all peers by the order of sequence id. - */ - public static final int REPLICATION_SCOPE_SERIAL = 2; - - /** * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. */ @@ -914,12 +894,6 @@ public final class HConstants { public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; - - public static final String - REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs"; - public static final long - REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; - /** * Max total size of buffered entries in all replication peers. It will prevent server getting * OOM if there are many peers. Default value is 256MB which is four times to default http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 5908359..e1ae0ef 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1542,20 +1542,6 @@ possible configurations would overwhelm and obscure the important. slave clusters. The default of 10 will rarely need to be changed. </description> </property> - <property> - <name>hbase.serial.replication.waitingMs</name> - <value>10000</value> - <description> - By default, in replication we can not make sure the order of operations in slave cluster is - same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order - of written. This configure is to set how long (in ms) we will wait before next checking if a - log can not push right now because there are some logs written before it have not been pushed. - A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay - of replication. This feature relies on zk-less assignment, and conflicts with distributed log - replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to - false to support it. - </description> - </property> <!-- Static Web User Filter properties. --> <property> <description> http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index a466e6c..e0efab4 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -21,10 +21,6 @@ public final class WALProtos { * <code>REPLICATION_SCOPE_GLOBAL = 1;</code> */ REPLICATION_SCOPE_GLOBAL(1, 1), - /** - * <code>REPLICATION_SCOPE_SERIAL = 2;</code> - */ - REPLICATION_SCOPE_SERIAL(2, 2), ; /** @@ -35,10 +31,6 @@ public final class WALProtos { * <code>REPLICATION_SCOPE_GLOBAL = 1;</code> */ public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1; - /** - * <code>REPLICATION_SCOPE_SERIAL = 2;</code> - */ - public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2; public final int getNumber() { return value; } @@ -47,7 +39,6 @@ public final class WALProtos { switch (value) { case 0: return REPLICATION_SCOPE_LOCAL; case 1: return REPLICATION_SCOPE_GLOBAL; - case 2: return REPLICATION_SCOPE_SERIAL; default: return null; } } @@ -12023,11 +12014,10 @@ public final class WALProtos { "e.pb.StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.h" + "base.pb.ServerName\022\023\n\013region_name\030\007 \001(\014\"" + ".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_" + - "CLOSE\020\001\"\014\n\nWALTrailer*d\n\tScopeType\022\033\n\027RE" + + "CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027RE" + "PLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_S" + - "COPE_GLOBAL\020\001\022\034\n\030REPLICATION_SCOPE_SERIA" + - "L\020\002B?\n*org.apache.hadoop.hbase.protobuf." + - "generatedB\tWALProtosH\001\210\001\000\240\001\001" + "COPE_GLOBAL\020\001B?\n*org.apache.hadoop.hbase" + + ".protobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-protocol/src/main/protobuf/WAL.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 08925f8..a888686 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -77,7 +77,6 @@ message WALKey { enum ScopeType { REPLICATION_SCOPE_LOCAL = 0; REPLICATION_SCOPE_GLOBAL = 1; - REPLICATION_SCOPE_SERIAL = 2; } message FamilyScope { http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f8bbc65..6951098 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -99,7 +99,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; -import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore; @@ -331,7 +330,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server { CatalogJanitor catalogJanitorChore; private ReplicationZKLockCleanerChore replicationZKLockCleanerChore; private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore; - private ReplicationMetaCleaner replicationMetaCleaner; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; @@ -1250,12 +1248,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } catch (Exception e) { LOG.error("start replicationZKNodeCleanerChore failed", e); } - try { - replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); - getChoreService().scheduleChore(replicationMetaCleaner); - } catch (Exception e) { - LOG.error("start ReplicationMetaCleaner failed", e); - } } @Override @@ -1291,7 +1283,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true); if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); - if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 2d445e2..476b4d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -17,15 +17,11 @@ */ package org.apache.hadoop.hbase.master; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.util.Arrays; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -49,6 +45,8 @@ import org.apache.hadoop.hbase.util.MultiHConnection; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.zookeeper.KeeperException; +import com.google.common.base.Preconditions; + /** * A helper to persist region state in meta. We may change this class * to StateStore later if we also use it to store other states in meta @@ -65,7 +63,7 @@ public class RegionStateStore { private volatile boolean initialized; private final boolean noPersistence; - private final MasterServices server; + private final Server server; /** * Returns the {@link ServerName} from catalog table {@link Result} @@ -135,7 +133,7 @@ public class RegionStateStore { State.SPLITTING_NEW, State.MERGED)); } - RegionStateStore(final MasterServices server) { + RegionStateStore(final Server server) { Configuration conf = server.getConfiguration(); // No need to persist if using ZK but not migrating noPersistence = ConfigUtil.useZKForAssignment(conf) @@ -200,41 +198,31 @@ public class RegionStateStore { State state = newState.getState(); int replicaId = hri.getReplicaId(); - Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri)); + Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri)); StringBuilder info = new StringBuilder("Updating hbase:meta row "); info.append(hri.getRegionNameAsString()).append(" with state=").append(state); if (serverName != null && !serverName.equals(oldServer)) { - metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId), + put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId), Bytes.toBytes(serverName.getServerName())); info.append(", sn=").append(serverName); } if (openSeqNum >= 0) { Preconditions.checkArgument(state == State.OPEN && serverName != null, "Open region should be on a server"); - MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId); + MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId); info.append(", openSeqNum=").append(openSeqNum); info.append(", server=").append(serverName); } - metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId), + put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId), Bytes.toBytes(state.name())); LOG.info(info); - HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable()); - boolean serial = false; - if (descriptor != null) { - serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope(); - } - boolean shouldPutBarrier = serial && state == State.OPEN; + // Persist the state change to meta if (metaRegion != null) { try { // Assume meta is pinned to master. // At least, that's what we want. - metaRegion.put(metaPut); - if (shouldPutBarrier) { - Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(), - openSeqNum, hri.getTable().getName()); - metaRegion.put(barrierPut); - } + metaRegion.put(put); return; // Done here } catch (Throwable t) { // In unit tests, meta could be moved away by intention @@ -253,10 +241,7 @@ public class RegionStateStore { } } // Called when meta is not on master - List<Put> list = shouldPutBarrier ? - Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(), - openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut); - multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null); + multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null); } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); @@ -266,14 +251,12 @@ public class RegionStateStore { void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { - MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication, - server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope()); + MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication); } void mergeRegions(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException { MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication, - EnvironmentEdgeManager.currentTime(), - server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope()); + EnvironmentEdgeManager.currentTime()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java deleted file mode 100644 index 41864b9..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.cleaner; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This chore is to clean up the useless data in hbase:meta which is used by serial replication. - */ -@InterfaceAudience.Private -public class ReplicationMetaCleaner extends ScheduledChore { - - private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class); - - private ReplicationAdmin replicationAdmin; - private MasterServices master; - - public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period) - throws IOException { - super("ReplicationMetaCleaner", stoppable, period); - this.master = master; - replicationAdmin = new ReplicationAdmin(master.getConfiguration()); - } - - @Override - protected void chore() { - try { - Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAll(); - Map<String, Set<String>> serialTables = new HashMap<>(); - for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) { - boolean hasSerialScope = false; - for (HColumnDescriptor column : entry.getValue().getFamilies()) { - if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) { - hasSerialScope = true; - break; - } - } - if (hasSerialScope) { - serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>()); - } - } - if (serialTables.isEmpty()){ - return; - } - - Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs(); - for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) { - for (Map.Entry<byte[], byte[]> map : entry.getValue().getPeerData() - .entrySet()) { - String tableName = Bytes.toString(map.getKey()); - if (serialTables.containsKey(tableName)) { - serialTables.get(tableName).add(entry.getKey()); - break; - } - } - } - - Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection()); - for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) { - String encodedName = entry.getKey(); - byte[] encodedBytes = Bytes.toBytes(encodedName); - boolean canClearRegion = false; - Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer( - master.getConnection(), encodedBytes); - if (posMap.isEmpty()) { - continue; - } - - String tableName = MetaTableAccessor.getSerialReplicationTableName( - master.getConnection(), encodedBytes); - Set<String> confPeers = serialTables.get(tableName); - if (confPeers == null) { - // This table doesn't exist or all cf's scope is not serial any more, we can clear meta. - canClearRegion = true; - } else { - if (!allPeersHavePosition(confPeers, posMap)) { - continue; - } - - String daughterValue = MetaTableAccessor - .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes); - if (daughterValue != null) { - //this region is merged or split - boolean allDaughterStart = true; - String[] daughterRegions = daughterValue.split(","); - for (String daughter : daughterRegions) { - byte[] region = Bytes.toBytes(daughter); - if (!MetaTableAccessor.getReplicationBarriers( - master.getConnection(), region).isEmpty() && - !allPeersHavePosition(confPeers, - MetaTableAccessor - .getReplicationPositionForAllPeer(master.getConnection(), region))) { - allDaughterStart = false; - break; - } - } - if (allDaughterStart) { - canClearRegion = true; - } - } - } - if (canClearRegion) { - Delete delete = new Delete(encodedBytes); - delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY); - delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); - try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { - metaTable.delete(delete); - } - } else { - - // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq - // is smaller than min pos should be kept. All other barriers can be deleted. - - long minPos = Long.MAX_VALUE; - for (Map.Entry<String, Long> pos : posMap.entrySet()) { - minPos = Math.min(minPos, pos.getValue()); - } - List<Long> barriers = entry.getValue(); - int index = Collections.binarySearch(barriers, minPos); - if (index < 0) { - index = -index - 1; - } - Delete delete = new Delete(encodedBytes); - for (int i = 0; i < index - 1; i++) { - delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i))); - } - try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { - metaTable.delete(delete); - } - } - - } - - } catch (IOException e) { - LOG.error("Exception during cleaning up.", e); - } - - } - - private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap) - throws IOException { - for(String peer:peers){ - if (!posMap.containsKey(peer)){ - return false; - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java index 9e31fb0..03aa059 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java @@ -434,8 +434,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { if (metaEntries.isEmpty()) { MetaTableAccessor.mergeRegions(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), - server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime, - false); + server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime); } else { mergeRegionsAndPutMetaEntries(server.getConnection(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index 95da92a..25a27a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java index a3eea6d..f9a5d31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java @@ -338,7 +338,7 @@ public class SplitTransactionImpl implements SplitTransaction { MetaTableAccessor.splitRegion(server.getConnection(), parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions.getSecond().getRegionInfo(), server.getServerName(), - parent.getTableDesc().getRegionReplication(), false); + parent.getTableDesc().getRegionReplication()); } else { offlineParentInMetaAndputMetaEntries(server.getConnection(), parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index b2b403b..d6f48b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; @@ -296,19 +295,8 @@ public class Replication extends WALActionsListener.Base implements if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell); } else { - WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(cell); - if (maybeEvent != null && (maybeEvent.getEventType() == - WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { - // In serially replication, we use scopes when reading close marker. - for (HColumnDescriptor cf : families) { - if (cf.getScope() != REPLICATION_SCOPE_LOCAL) { - scopes.put(cf.getName(), cf.getScope()); - } - } - } - // Skip the flush/compaction + // Skip the flush/compaction/region events continue; - } } else if (hasReplication) { byte[] family = CellUtil.cloneFamily(cell); http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10f2e7b..add1043 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,9 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; @@ -49,7 +46,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -76,6 +72,10 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; + /** * Class that handles the source of a replication stream. * Currently does not handle more than 1 slave @@ -105,8 +105,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; - - String actualPeerId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; // Should we stop everything? @@ -191,8 +189,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); - this.actualPeerId = replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; @@ -523,16 +519,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // Current state of the worker thread private WorkerState state; ReplicationSourceWALReaderThread entryReader; - // Use guava cache to set ttl for each key - private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder() - .expireAfterAccess(1, TimeUnit.DAYS).build( - new CacheLoader<String, Boolean>() { - @Override - public Boolean load(String key) throws Exception { - return false; - } - } - ); public ReplicationSourceShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, @@ -568,9 +554,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf try { WALEntryBatch entryBatch = entryReader.take(); - for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) { - waitingUntilCanPush(entry); - } shipEdits(entryBatch); releaseBufferQuota((int) entryBatch.getHeapSize()); if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty() @@ -611,33 +594,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } - private void waitingUntilCanPush(Map.Entry<String, Long> entry) { - String key = entry.getKey(); - long seq = entry.getValue(); - boolean deleteKey = false; - if (seq <= 0) { - // There is a REGION_CLOSE marker, we can not continue skipping after this entry. - deleteKey = true; - seq = -seq; - } - - if (!canSkipWaitingSet.getUnchecked(key)) { - try { - manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId); - } catch (IOException e) { - LOG.error("waitUntilCanBePushed fail", e); - stopper.stop("waitUntilCanBePushed fail"); - } catch (InterruptedException e) { - LOG.warn("waitUntilCanBePushed interrupted", e); - Thread.currentThread().interrupt(); - } - canSkipWaitingSet.put(key, true); - } - if (deleteKey) { - canSkipWaitingSet.invalidate(key); - } - } - private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = peerClusterZnode; if (peerId.contains("-")) { @@ -682,8 +638,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf int sleepMultiplier = 0; if (entries.isEmpty()) { if (lastLoggedPosition != lastReadPosition) { - // Save positions to meta table before zk. - updateSerialRepPositions(entryBatch.getLastSeqIds()); updateLogPosition(lastReadPosition); // if there was nothing to ship and it's not an error // set "ageOfLastShippedOp" to <now> to indicate that we're current @@ -738,10 +692,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf for (int i = 0; i < size; i++) { cleanUpHFileRefs(entries.get(i).getEdit()); } - - // Save positions to meta table before zk. - updateSerialRepPositions(entryBatch.getLastSeqIds()); - //Log and clean up WAL logs updateLogPosition(lastReadPosition); } @@ -770,16 +720,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } } - private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) { - try { - MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId, - lastPositionsForSerialScope); - } catch (IOException e) { - LOG.error("updateReplicationPositions fail", e); - stopper.stop("updateReplicationPositions fail"); - } - } - private void updateLogPosition(long lastReadPosition) { manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition, this.replicationQueueInfo.isQueueRecovered(), false); http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 6ec30de..63bba8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -50,13 +49,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -70,7 +66,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.DefaultWALProvider; /** @@ -125,8 +120,6 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; private final boolean replicationForBulkLoadDataEnabled; - private Connection connection; - private long replicationWaitTime; private AtomicLong totalBufferUsed = new AtomicLong(); @@ -145,7 +138,7 @@ public class ReplicationSourceManager implements ReplicationListener { public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final Configuration conf, final Server server, final FileSystem fs, final Path logDir, - final Path oldLogDir, final UUID clusterId) throws IOException { + final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>(); @@ -182,9 +175,6 @@ public class ReplicationSourceManager implements ReplicationListener { replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, - HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT); - connection = ConnectionFactory.createConnection(conf); } /** @@ -830,10 +820,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} - public Connection getConnection() { - return this.connection; - } - /** * Get a string representation of all the sources' metrics */ @@ -860,75 +846,4 @@ public class ReplicationSourceManager implements ReplicationListener { public void cleanUpHFileRefs(String peerId, List<String> files) { this.replicationQueues.removeHFileRefs(peerId, files); } - - /** - * Whether an entry can be pushed to the peer or not right now. - * If we enable serial replication, we can not push the entry until all entries in its region - * whose sequence numbers are smaller than this entry have been pushed. - * For each ReplicationSource, we need only check the first entry in each region, as long as it - * can be pushed, we can push all in this ReplicationSource. - * This method will be blocked until we can push. - * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to - * prevent saving positions in the region of no barrier. - */ - void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId) - throws IOException, InterruptedException { - - /** - * There are barriers for this region and position for this peer. N barriers form N intervals, - * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than - * the first barrier and the last interval is start from the last barrier. - * - * There are several conditions that we can push now, otherwise we should block: - * 1) "Serial replication" is not enabled, we can push all logs just like before. This case - * should not call this method. - * 2) There is no barriers for this region, or the seq id is smaller than the first barrier. - * It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the - * order of logs that is written before altering. - * 3) This entry is in the first interval of barriers. We can push them because it is the - * start of a region. Splitting/merging regions are also ok because the first section of - * daughter region is in same region of parents and the order in one RS is guaranteed. - * 4) If the entry's seq id and the position are in same section, or the pos is the last - * number of previous section. Because when open a region we put a barrier the number - * is the last log's id + 1. - * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes - * after save replication meta and before save zk offset. - */ - List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName); - if (barriers.isEmpty() || seq <= barriers.get(0)) { - // Case 2 - return; - } - int interval = Collections.binarySearch(barriers, seq); - if (interval < 0) { - interval = -interval - 1;// get the insert position if negative - } - if (interval == 1) { - // Case 3 - return; - } - - while (true) { - long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId); - if (seq <= pos) { - // Case 5 - } - if (pos >= 0) { - // Case 4 - int posInterval = Collections.binarySearch(barriers, pos); - if (posInterval < 0) { - posInterval = -posInterval - 1;// get the insert position if negative - } - if (posInterval == interval || pos == barriers.get(interval - 1) - 1) { - return; - } - } - - LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId - + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos - + " barriers=" + Arrays.toString(barriers.toArray())); - Thread.sleep(replicationWaitTime); - } - } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 40828b7..306ba8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -141,10 +141,6 @@ public class ReplicationSourceWALReaderThread extends Thread { batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); } Entry entry = entryStream.next(); - if (updateSerialReplPos(batch, entry)) { - batch.lastWalPosition = entryStream.getPosition(); - break; - } entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); @@ -246,33 +242,6 @@ public class ReplicationSourceWALReaderThread extends Thread { } /** - * @return true if we should stop reading because we're at REGION_CLOSE - */ - private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException { - if (entry.hasSerialReplicationScope()) { - String key = Bytes.toString(entry.getKey().getEncodedRegionName()); - batch.setLastPosition(key, entry.getKey().getSequenceId()); - if (!entry.getEdit().getCells().isEmpty()) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); - if (maybeEvent != null && maybeEvent - .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) { - // In serially replication, if we move a region to another RS and move it back, we may - // read logs crossing two sections. We should break at REGION_CLOSE and push the first - // section first in case of missing the middle section belonging to the other RS. - // In a worker thread, if we can push the first log of a region, we can push all logs - // in the same region without waiting until we read a close marker because next time - // we read logs in this region, it must be a new section and not adjacent with this - // region. Mark it negative. - batch.setLastPosition(key, -entry.getKey().getSequenceId()); - return true; - } - } - } - return false; - } - - /** * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a * batch to become available * @return A batch of entries, along with the position in the log after reading the batch http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 2e34b64..cc2f42a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; -import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.HConstants; @@ -271,18 +270,6 @@ public interface WAL extends Closeable { key.setCompressionContext(compressionContext); } - public boolean hasSerialReplicationScope () { - if (getKey().getScopes() == null || getKey().getScopes().isEmpty()) { - return false; - } - for (Map.Entry<byte[], Integer> e:getKey().getScopes().entrySet()) { - if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){ - return true; - } - } - return false; - } - @Override public String toString() { return this.key + "=" + this.edit; http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 98fff27..cb2494b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -443,7 +443,7 @@ public class TestMetaTableAccessor { List<HRegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); assertEmptyMetaLocation(meta, splitA.getRegionName(), 1); assertEmptyMetaLocation(meta, splitA.getRegionName(), 2); @@ -472,7 +472,7 @@ public class TestMetaTableAccessor { MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3, - HConstants.LATEST_TIMESTAMP, false); + HConstants.LATEST_TIMESTAMP); assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 2); @@ -556,7 +556,7 @@ public class TestMetaTableAccessor { // now merge the regions, effectively deleting the rows for region a and b. MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, - regionInfoA, regionInfoB, sn, 1, masterSystemTime, false); + regionInfoA, regionInfoB, sn, 1, masterSystemTime); result = meta.get(get); serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, @@ -639,7 +639,7 @@ public class TestMetaTableAccessor { } SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); long prevCalls = scheduler.numPriorityCalls; - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); assertTrue(prevCalls < scheduler.numPriorityCalls); } @@ -661,7 +661,7 @@ public class TestMetaTableAccessor { List<HRegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); Get get1 = new Get(splitA.getRegionName()); Result resultA = meta.get(get1); Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY, http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java index bca8cf3..a91560e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java @@ -166,7 +166,7 @@ public class TestMetaScanner { end); MetaTableAccessor.splitRegion(connection, - parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1, false); + parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1); Threads.sleep(random.nextInt(200)); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index 78b23c0..69dfa40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -1317,7 +1317,7 @@ public class TestAssignmentManagerOnCluster { } conf.setInt("hbase.regionstatestore.meta.connection", 3); final RegionStateStore rss = - new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager())); + new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager())); rss.start(); // Create 10 threads and make each do 10 puts related to region state update Thread[] th = new Thread[10];