HBASE-20048 Revert serial replication feature
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4ddfdaff Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4ddfdaff Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4ddfdaff Branch: refs/heads/HBASE-19397-branch-2 Commit: 4ddfdaffdcf83d50aac26fcff5e1ac4ecd575335 Parents: 8a22e41 Author: zhangduo <zhang...@apache.org> Authored: Fri Feb 23 08:51:37 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Feb 23 13:54:10 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HTableDescriptor.java | 8 - .../apache/hadoop/hbase/MetaTableAccessor.java | 296 ++------------ .../hadoop/hbase/client/TableDescriptor.java | 9 +- .../hbase/client/TableDescriptorBuilder.java | 17 +- .../client/replication/ReplicationAdmin.java | 10 +- .../client/TestTableDescriptorBuilder.java | 25 +- .../org/apache/hadoop/hbase/HConstants.java | 33 -- .../src/main/resources/hbase-default.xml | 13 - hbase-protocol/src/main/protobuf/WAL.proto | 1 - .../org/apache/hadoop/hbase/master/HMaster.java | 8 +- .../master/assignment/RegionStateStore.java | 23 +- .../master/cleaner/ReplicationMetaCleaner.java | 191 --------- .../hbase/regionserver/HRegionServer.java | 77 ++-- .../hbase/regionserver/wal/FSWALEntry.java | 1 + .../RecoveredReplicationSourceShipper.java | 3 +- .../regionserver/ReplicationSourceManager.java | 121 +----- .../regionserver/ReplicationSourceShipper.java | 76 +--- .../ReplicationSourceWALReader.java | 52 +-- .../hbase/snapshot/RestoreSnapshotHelper.java | 2 +- .../hadoop/hbase/util/FSTableDescriptors.java | 30 -- .../java/org/apache/hadoop/hbase/wal/WAL.java | 19 +- .../hadoop/hbase/TestMetaTableAccessor.java | 10 +- .../regionserver/TestRegionServerMetrics.java | 2 +- .../replication/TestSerialReplication.java | 400 ------------------- .../regionserver/TestGlobalThrottler.java | 2 +- src/main/asciidoc/_chapters/ops_mgt.adoc | 41 +- 26 files changed, 87 insertions(+), 1383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index c9807c3..e512b2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -538,14 +538,6 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr } /** - * Return true if there are at least one cf whose replication scope is serial. - */ - @Override - public boolean hasSerialReplicationScope() { - return delegatee.hasSerialReplicationScope(); - } - - /** * Returns the configured replicas per region */ @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index dad9aef..7d00f92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hbase; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -33,7 +34,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.client.Connection; @@ -71,9 +71,8 @@ import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.Nullable; /** * Read/write operations on region and assignment information store in @@ -123,34 +122,14 @@ public class MetaTableAccessor { * region is the result of a merge * info:mergeB => contains a serialized HRI for the second parent region if the * region is the result of a merge + * * The actual layout of meta should be encapsulated inside MetaTableAccessor methods, * and should not leak out of it (through Result objects, etc) - * - * For replication serially, there are three column families "rep_barrier", "rep_position" and - * "rep_meta" whose row key is encodedRegionName. - * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence - * id in this region - * rep_position:{peerid} => to save the max sequence id we have pushed for each peer - * rep_meta:_TABLENAME_ => a special cell to save this region's table name, will used when - * we clean old data - * rep_meta:_DAUGHTER_ => a special cell to present this region is split or merged, in this - * cell the value is merged encoded name or two split encoded names - * separated by "," - * rep_meta:_PARENT_ => a special cell to present this region's parent region(s), in this - * cell the value is encoded name of one or two parent regions - * separated by "," */ private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class); private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META"); - // Save its daughter/parent region(s) when split/merge - private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_"); - private static final byte[] parentNameCq = Bytes.toBytes("_PARENT_"); - - // Save its table name because we only know region's encoded name - private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_"); - static final byte [] META_REGION_PREFIX; static { // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX. @@ -1352,56 +1331,6 @@ public class MetaTableAccessor { return delete; } - public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) - throws IOException { - byte[] seqBytes = Bytes.toBytes(seq); - Put put = new Put(encodedRegionName); - put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) - .setRow(put.getRow()) - .setFamily(HConstants.REPLICATION_BARRIER_FAMILY) - .setQualifier(seqBytes) - .setTimestamp(put.getTimeStamp()) - .setType(Type.Put) - .setValue(seqBytes) - .build()) - .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) - .setRow(put.getRow()) - .setFamily(HConstants.REPLICATION_META_FAMILY) - .setQualifier(tableNameCq) - .setTimestamp(put.getTimeStamp()) - .setType(Cell.Type.Put) - .setValue(tableName) - .build()); - return put; - } - - - public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) throws IOException { - Put put = new Put(encodedRegionName); - put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) - .setRow(put.getRow()) - .setFamily(HConstants.REPLICATION_META_FAMILY) - .setQualifier(daughterNameCq) - .setTimestamp(put.getTimeStamp()) - .setType(Type.Put) - .setValue(value) - .build()); - return put; - } - - public static Put makeParentPut(byte[] encodedRegionName, byte[] value) throws IOException { - Put put = new Put(encodedRegionName); - put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) - .setRow(put.getRow()) - .setFamily(HConstants.REPLICATION_META_FAMILY) - .setQualifier(parentNameCq) - .setTimestamp(put.getTimeStamp()) - .setType(Type.Put) - .setValue(value) - .build()); - return put; - } - /** * Adds split daughters to the Put */ @@ -1431,26 +1360,25 @@ public class MetaTableAccessor { } /** - * Put the passed <code>puts</code> to the <code>hbase:meta</code> table. - * Non-atomic for multi puts. + * Put the passed <code>p</code> to the <code>hbase:meta</code> table. * @param connection connection we're using - * @param puts Put to add to hbase:meta + * @param p Put to add to hbase:meta * @throws IOException */ - public static void putToMetaTable(final Connection connection, final Put... puts) + static void putToMetaTable(final Connection connection, final Put p) throws IOException { - put(getMetaHTable(connection), Arrays.asList(puts)); + put(getMetaHTable(connection), p); } /** * @param t Table to use (will be closed when done). - * @param puts puts to make + * @param p put to make * @throws IOException */ - private static void put(final Table t, final List<Put> puts) throws IOException { + private static void put(final Table t, final Put p) throws IOException { try { - debugLogMutations(puts); - t.put(puts); + debugLogMutation(p); + t.put(p); } finally { t.close(); } @@ -1567,7 +1495,7 @@ public class MetaTableAccessor { * Adds daughter region infos to hbase:meta row for the specified region. Note that this does not * add its daughter's as different rows, but adds information about the daughters in the same row * as the parent. Use - * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int,boolean)} + * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int)} * if you want to do that. * @param connection connection we're using * @param regionInfo RegionInfo of parent region @@ -1575,7 +1503,7 @@ public class MetaTableAccessor { * @param splitB second split daughter of the parent regionInfo * @throws IOException if problem connecting or updating meta */ - public static void addSpiltsToParent(Connection connection, RegionInfo regionInfo, + public static void addSplitsToParent(Connection connection, RegionInfo regionInfo, RegionInfo splitA, RegionInfo splitB) throws IOException { Table meta = getMetaHTable(connection); try { @@ -1590,7 +1518,11 @@ public class MetaTableAccessor { } /** - * Adds a hbase:meta row for the specified new region. Initial state of new region is CLOSED. + * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this + * does not add its daughter's as different rows, but adds information about the daughters + * in the same row as the parent. Use + * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName, int)} + * if you want to do that. * @param connection connection we're using * @param regionInfo region information * @throws IOException if problem connecting or updating meta @@ -1651,12 +1583,11 @@ public class MetaTableAccessor { * @param regionB * @param sn the location of the region * @param masterSystemTime - * @param saveBarrier true if need save replication barrier in meta, used for serial replication * @throws IOException */ public static void mergeRegions(final Connection connection, RegionInfo mergedRegion, RegionInfo regionA, RegionInfo regionB, ServerName sn, int regionReplication, - long masterSystemTime, boolean saveBarrier) + long masterSystemTime) throws IOException { Table meta = getMetaHTable(connection); try { @@ -1707,20 +1638,7 @@ public class MetaTableAccessor { byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER); - Mutation[] mutations; - if (saveBarrier) { - Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(), - mergedRegion.getEncodedNameAsBytes()); - Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(), - mergedRegion.getEncodedNameAsBytes()); - Put putDaughter = makeParentPut(mergedRegion.getEncodedNameAsBytes(), Bytes.toBytes( - regionA.getEncodedName() + "," + regionB.getEncodedName())); - mutations = new Mutation[] { putOfMerged, deleteA, deleteB, - putBarrierA, putBarrierB, putDaughter}; - } else { - mutations = new Mutation[] { putOfMerged, deleteA, deleteB }; - } - multiMutate(connection, meta, tableRow, mutations); + multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB); } finally { meta.close(); } @@ -1736,11 +1654,9 @@ public class MetaTableAccessor { * @param splitA Split daughter region A * @param splitB Split daughter region A * @param sn the location of the region - * @param saveBarrier true if need save replication barrier in meta, used for serial replication */ - public static void splitRegion(final Connection connection, RegionInfo parent, - RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication, - boolean saveBarrier) throws IOException { + public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA, + RegionInfo splitB, ServerName sn, int regionReplication) throws IOException { Table meta = getMetaHTable(connection); try { //Put for parent @@ -1771,21 +1687,8 @@ public class MetaTableAccessor { addEmptyLocation(putB, i); } - Mutation[] mutations; - if (saveBarrier) { - Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(), - Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName())); - Put daughterPutA = makeParentPut(splitA.getEncodedNameAsBytes(), - parent.getEncodedNameAsBytes()); - Put daughterPutB = makeParentPut(splitB.getEncodedNameAsBytes(), - parent.getEncodedNameAsBytes()); - - mutations = new Mutation[]{putParent, putA, putB, parentPut, daughterPutA, daughterPutB}; - } else { - mutations = new Mutation[]{putParent, putA, putB}; - } byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); - multiMutate(connection, meta, tableRow, mutations); + multiMutate(connection, meta, tableRow, putParent, putA, putB); } finally { meta.close(); } @@ -1920,32 +1823,6 @@ public class MetaTableAccessor { } /** - * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1. - * @param connection connection we're using - * @param peerId the peerId to push - * @param positions map that saving positions for each region - * @throws IOException - */ - public static void updateReplicationPositions(Connection connection, String peerId, - Map<String, Long> positions) throws IOException { - List<Put> puts = new ArrayList<>(positions.entrySet().size()); - for (Map.Entry<String, Long> entry : positions.entrySet()) { - Put put = new Put(Bytes.toBytes(entry.getKey())); - put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) - .setRow(put.getRow()) - .setFamily(HConstants.REPLICATION_POSITION_FAMILY) - .setQualifier(Bytes.toBytes(peerId)) - .setTimestamp(put.getTimeStamp()) - .setType(Cell.Type.Put) - .setValue(Bytes.toBytes(Math.abs(entry.getValue()))) - .build()); - puts.add(put); - } - getMetaHTable(connection).put(puts); - } - - - /** * Updates the location of the specified region to be the specified server. * <p> * Connects to the specified server which should be hosting the specified @@ -2163,129 +2040,4 @@ public class MetaTableAccessor { .setValue(Bytes.toBytes(openSeqNum)) .build()); } - - /** - * Get replication position for a peer in a region. - * @param connection connection we're using - * @return the position of this peer, -1 if no position in meta. - */ - public static long getReplicationPositionForOnePeer(Connection connection, - byte[] encodedRegionName, String peerId) throws IOException { - Get get = new Get(encodedRegionName); - get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId)); - Result r = get(getMetaHTable(connection), get); - if (r.isEmpty()) { - return -1; - } - Cell cell = r.rawCells()[0]; - return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); - } - - /** - * Get replication positions for all peers in a region. - * @param connection connection we're using - * @param encodedRegionName region's encoded name - * @return the map of positions for each peer - */ - public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection, - byte[] encodedRegionName) throws IOException { - Get get = new Get(encodedRegionName); - get.addFamily(HConstants.REPLICATION_POSITION_FAMILY); - Result r = get(getMetaHTable(connection), get); - Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1)); - for (Cell c : r.listCells()) { - map.put( - Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()), - Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); - } - return map; - } - - /** - * Get replication barriers for all peers in a region. - * @param encodedRegionName region's encoded name - * @return a list of barrier sequence numbers. - * @throws IOException - */ - public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName) - throws IOException { - Get get = new Get(encodedRegionName); - get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); - Result r = get(getMetaHTable(connection), get); - List<Long> list = new ArrayList<>(); - if (!r.isEmpty()) { - for (Cell cell : r.rawCells()) { - list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength())); - } - } - return list; - } - - /** - * Get all barriers in all regions. - * @return a map of barrier lists in all regions - * @throws IOException - */ - public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException { - Map<String, List<Long>> map = new HashMap<>(); - Scan scan = new Scan(); - scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); - try (Table t = getMetaHTable(connection); - ResultScanner scanner = t.getScanner(scan)) { - Result result; - while ((result = scanner.next()) != null) { - String key = Bytes.toString(result.getRow()); - List<Long> list = new ArrayList<>(result.rawCells().length); - for (Cell cell : result.rawCells()) { - list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength())); - } - map.put(key, list); - } - } - return map; - } - - private static String getSerialReplicationColumnValue(Connection connection, - byte[] encodedRegionName, byte[] columnQualifier) throws IOException { - Get get = new Get(encodedRegionName); - get.addColumn(HConstants.REPLICATION_META_FAMILY, columnQualifier); - Result result = get(getMetaHTable(connection), get); - if (!result.isEmpty()) { - Cell c = result.rawCells()[0]; - return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength()); - } - return null; - } - - /** - * Get daughter region(s) for a region, only used in serial replication. - * @param connection connection we're using - * @param encodedName region's encoded name - */ - public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName) - throws IOException { - return getSerialReplicationColumnValue(connection, encodedName, daughterNameCq); - } - - /** - * Get parent region(s) for a region, only used in serial replication. - * @param connection connection we're using - * @param encodedName region's encoded name - */ - public static String getSerialReplicationParentRegion(Connection connection, byte[] encodedName) - throws IOException { - return getSerialReplicationColumnValue(connection, encodedName, parentNameCq); - } - - /** - * Get the table name for a region, only used in serial replication. - * @param connection connection we're using - * @param encodedName region's encoded name - */ - public static String getSerialReplicationTableName(Connection connection, byte[] encodedName) - throws IOException { - return getSerialReplicationColumnValue(connection, encodedName, tableNameCq); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index f485c4e..305b352 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -232,12 +232,6 @@ public interface TableDescriptor { boolean hasRegionMemStoreReplication(); /** - * @return true if there are at least one cf whose replication scope is - * serial. - */ - boolean hasSerialReplicationScope(); - - /** * Check if the compaction enable flag of the table is true. If flag is false * then no minor/major compactions will be done in real. * @@ -285,8 +279,7 @@ public interface TableDescriptor { boolean hasDisabled = false; for (ColumnFamilyDescriptor cf : getColumnFamilies()) { - if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL - && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { + if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { hasDisabled = true; } else { hasEnabled = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 9f40ae6..c1db64b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -32,21 +32,20 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Function; import java.util.regex.Matcher; -import java.util.stream.Stream; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + /** * @since 2.0.0 */ @@ -1055,16 +1054,6 @@ public class TableDescriptorBuilder { } /** - * Return true if there are at least one cf whose replication scope is - * serial. - */ - @Override - public boolean hasSerialReplicationScope() { - return Stream.of(getColumnFamilies()) - .anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL); - } - - /** * Returns the configured replicas per region */ @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index c7bf7e2..722dc2a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -78,10 +78,8 @@ public class ReplicationAdmin implements Closeable { // only Global for now, can add other type // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc. public static final String REPLICATIONTYPE = "replicationType"; - public static final String REPLICATIONGLOBAL = - Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL); - public static final String REPLICATIONSERIAL = - Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL); + public static final String REPLICATIONGLOBAL = Integer + .toString(HConstants.REPLICATION_SCOPE_GLOBAL); private final Connection connection; private Admin admin; @@ -356,9 +354,7 @@ public class ReplicationAdmin implements Closeable { HashMap<String, String> replicationEntry = new HashMap<>(); replicationEntry.put(TNAME, table); replicationEntry.put(CFNAME, cf); - replicationEntry.put(REPLICATIONTYPE, - scope == HConstants.REPLICATION_SCOPE_GLOBAL ? REPLICATIONGLOBAL - : REPLICATIONSERIAL); + replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL); replicationColFams.add(replicationEntry); }); }); http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java index 7794a04..f83e13f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java @@ -24,8 +24,9 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.regex.Pattern; -import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -341,26 +342,4 @@ public class TestTableDescriptorBuilder { .build(); assertEquals(42, htd.getPriority()); } - - @Test - public void testSerialReplicationScope() { - HColumnDescriptor hcdWithScope = new HColumnDescriptor(Bytes.toBytes("cf0")); - hcdWithScope.setScope(HConstants.REPLICATION_SCOPE_SERIAL); - HColumnDescriptor hcdWithoutScope = new HColumnDescriptor(Bytes.toBytes("cf1")); - TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) - .addColumnFamily(hcdWithoutScope) - .build(); - assertFalse(htd.hasSerialReplicationScope()); - - htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) - .addColumnFamily(hcdWithScope) - .build(); - assertTrue(htd.hasSerialReplicationScope()); - - htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) - .addColumnFamily(hcdWithScope) - .addColumnFamily(hcdWithoutScope) - .build(); - assertTrue(htd.hasSerialReplicationScope()); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e6f28bb..adc7194 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -458,27 +458,6 @@ public final class HConstants { /** The catalog family */ public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR); - /** The replication barrier family as a string*/ - public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier"; - - /** The replication barrier family */ - public static final byte [] REPLICATION_BARRIER_FAMILY = - Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR); - - /** The replication position family as a string*/ - public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position"; - - /** The replication position family */ - public static final byte [] REPLICATION_POSITION_FAMILY = - Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR); - - /** The replication meta family as a string*/ - public static final String REPLICATION_META_FAMILY_STR = "rep_meta"; - - /** The replication meta family */ - public static final byte [] REPLICATION_META_FAMILY = - Bytes.toBytes(REPLICATION_META_FAMILY_STR); - /** The RegionInfo qualifier as a string */ public static final String REGIONINFO_QUALIFIER_STR = "regioninfo"; @@ -676,12 +655,6 @@ public final class HConstants { public static final int REPLICATION_SCOPE_GLOBAL = 1; /** - * Scope tag for serially scoped data - * This data will be replicated to all peers by the order of sequence id. - */ - public static final int REPLICATION_SCOPE_SERIAL = 2; - - /** * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. */ @@ -931,12 +904,6 @@ public final class HConstants { public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; - - public static final String - REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs"; - public static final long - REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; - /** * Max total size of buffered entries in all replication peers. It will prevent server getting * OOM if there are many peers. Default value is 256MB which is four times to default http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index a8a02db..d7e4476 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1658,19 +1658,6 @@ possible configurations would overwhelm and obscure the important. default of 10 will rarely need to be changed. </description> </property> - <property> - <name>hbase.serial.replication.waitingMs</name> - <value>10000</value> - <description> - By default, in replication we can not make sure the order of operations in slave cluster is - same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order - of written. This configuration is to set how long (in ms) we will wait before next checking if - a log can NOT be pushed because there are some logs written before it that have yet to be - pushed. A larger waiting will decrease the number of queries on hbase:meta but will enlarge - the delay of replication. This feature relies on zk-less assignment, so users must set - hbase.assignment.usezk to false to support it. - </description> - </property> <!-- Static Web User Filter properties. --> <property> <name>hbase.http.staticuser.user</name> http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-protocol/src/main/protobuf/WAL.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 9bf4a79..3912504 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -76,7 +76,6 @@ message WALKey { enum ScopeType { REPLICATION_SCOPE_LOCAL = 0; REPLICATION_SCOPE_GLOBAL = 1; - REPLICATION_SCOPE_SERIAL = 2; } message FamilyScope { http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9d87182..8e2aa32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -108,7 +108,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; -import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore; import org.apache.hadoop.hbase.master.locking.LockManager; @@ -126,7 +125,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; -import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; @@ -360,7 +358,7 @@ public class HMaster extends HRegionServer implements MasterServices { private ClusterStatusPublisher clusterStatusPublisherChore = null; CatalogJanitor catalogJanitorChore; - private ReplicationMetaCleaner replicationMetaCleaner; + private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; @@ -1163,7 +1161,6 @@ public class HMaster extends HRegionServer implements MasterServices { if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } - // Start replication zk node cleaner try { replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, @@ -1172,8 +1169,6 @@ public class HMaster extends HRegionServer implements MasterServices { } catch (Exception e) { LOG.error("start replicationZKNodeCleanerChore failed", e); } - replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); - getChoreService().scheduleChore(replicationMetaCleaner); } @Override @@ -1197,7 +1192,6 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); - if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index a5c4cf2..ab5c442 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -199,15 +199,7 @@ public class RegionStateStore { .setValue(Bytes.toBytes(state.name())) .build()); LOG.info(info.toString()); - - final boolean serialReplication = hasSerialReplicationScope(regionInfo.getTable()); - if (serialReplication && state == State.OPEN) { - Put barrierPut = MetaTableAccessor.makeBarrierPut(regionInfo.getEncodedNameAsBytes(), - openSeqNum, regionInfo.getTable().getName()); - updateRegionLocation(regionInfo, state, put, barrierPut); - } else { - updateRegionLocation(regionInfo, state, put); - } + updateRegionLocation(regionInfo, state, put); } protected void updateRegionLocation(final RegionInfo regionInfo, final State state, @@ -238,7 +230,7 @@ public class RegionStateStore { final RegionInfo hriB, final ServerName serverName) throws IOException { final TableDescriptor htd = getTableDescriptor(parent.getTable()); MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName, - getRegionReplication(htd), hasSerialReplicationScope(htd)); + getRegionReplication(htd)); } // ============================================================================================ @@ -248,8 +240,7 @@ public class RegionStateStore { final RegionInfo hriB, final ServerName serverName) throws IOException { final TableDescriptor htd = getTableDescriptor(parent.getTable()); MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName, - getRegionReplication(htd), EnvironmentEdgeManager.currentTime(), - hasSerialReplicationScope(htd)); + getRegionReplication(htd), EnvironmentEdgeManager.currentTime()); } // ============================================================================================ @@ -266,14 +257,6 @@ public class RegionStateStore { // ========================================================================== // Table Descriptors helpers // ========================================================================== - private boolean hasSerialReplicationScope(final TableName tableName) throws IOException { - return hasSerialReplicationScope(getTableDescriptor(tableName)); - } - - private boolean hasSerialReplicationScope(final TableDescriptor htd) { - return (htd != null)? htd.hasSerialReplicationScope(): false; - } - private int getRegionReplication(final TableDescriptor htd) { return (htd != null) ? htd.getRegionReplication() : 1; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java deleted file mode 100644 index 43a99bd..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.cleaner; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This chore is to clean up the useless data in hbase:meta which is used by serial replication. - */ -@InterfaceAudience.Private -public class ReplicationMetaCleaner extends ScheduledChore { - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationMetaCleaner.class); - - private final Admin admin; - private final MasterServices master; - - public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period) - throws IOException { - super("ReplicationMetaCleaner", stoppable, period); - this.master = master; - admin = master.getConnection().getAdmin(); - } - - @Override - protected void chore() { - try { - Map<String, TableDescriptor> tables = master.getTableDescriptors().getAllDescriptors(); - Map<String, Set<String>> serialTables = new HashMap<>(); - for (Map.Entry<String, TableDescriptor> entry : tables.entrySet()) { - boolean hasSerialScope = false; - for (ColumnFamilyDescriptor column : entry.getValue().getColumnFamilies()) { - if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) { - hasSerialScope = true; - break; - } - } - if (hasSerialScope) { - serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<>()); - } - } - if (serialTables.isEmpty()){ - return; - } - - List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); - for (ReplicationPeerDescription peerDesc : peers) { - Map<TableName, List<String>> tableCFsMap = peerDesc.getPeerConfig().getTableCFsMap(); - if (tableCFsMap ==null) { - continue; - } - - for (Map.Entry<TableName, List<String>> map : tableCFsMap.entrySet()) { - if (serialTables.containsKey(map.getKey().getNameAsString())) { - serialTables.get(map.getKey().getNameAsString()).add(peerDesc.getPeerId()); - break; - } - } - } - - Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection()); - for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) { - String encodedName = entry.getKey(); - byte[] encodedBytes = Bytes.toBytes(encodedName); - boolean canClearRegion = false; - Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer( - master.getConnection(), encodedBytes); - if (posMap.isEmpty()) { - continue; - } - - String tableName = MetaTableAccessor.getSerialReplicationTableName( - master.getConnection(), encodedBytes); - Set<String> confPeers = serialTables.get(tableName); - if (confPeers == null) { - // This table doesn't exist or all cf's scope is not serial any more, we can clear meta. - canClearRegion = true; - } else { - if (!allPeersHavePosition(confPeers, posMap)) { - continue; - } - - String daughterValue = MetaTableAccessor - .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes); - if (daughterValue != null) { - //this region is merged or split - boolean allDaughterStart = true; - String[] daughterRegions = daughterValue.split(","); - for (String daughter : daughterRegions) { - byte[] region = Bytes.toBytes(daughter); - if (!MetaTableAccessor.getReplicationBarriers( - master.getConnection(), region).isEmpty() && - !allPeersHavePosition(confPeers, - MetaTableAccessor - .getReplicationPositionForAllPeer(master.getConnection(), region))) { - allDaughterStart = false; - break; - } - } - if (allDaughterStart) { - canClearRegion = true; - } - } - } - if (canClearRegion) { - Delete delete = new Delete(encodedBytes); - delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY); - delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); - delete.addFamily(HConstants.REPLICATION_META_FAMILY); - try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { - metaTable.delete(delete); - } - } else { - - // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq - // is smaller than min pos should be kept. All other barriers can be deleted. - - long minPos = Long.MAX_VALUE; - for (Map.Entry<String, Long> pos : posMap.entrySet()) { - minPos = Math.min(minPos, pos.getValue()); - } - List<Long> barriers = entry.getValue(); - int index = Collections.binarySearch(barriers, minPos); - if (index < 0) { - index = -index - 1; - } - Delete delete = new Delete(encodedBytes); - for (int i = 0; i < index - 1; i++) { - delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i))); - } - try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { - metaTable.delete(delete); - } - } - - } - - } catch (IOException e) { - LOG.error("Exception during cleaning up.", e); - } - - } - - private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap) - throws IOException { - for(String peer:peers){ - if (!posMap.containsKey(peer)){ - return false; - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7e5f277..f26e2cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; @@ -3158,54 +3157,34 @@ public class HRegionServer extends HasThread implements * @return true if closed the region successfully. * @throws IOException */ - protected boolean closeAndOfflineRegionForSplitOrMerge( - final List<String> regionEncodedName) throws IOException { - for (int i = 0; i < regionEncodedName.size(); ++i) { - HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); - if (regionToClose != null) { - Map<byte[], List<HStoreFile>> hstoreFiles = null; - Exception exceptionToThrow = null; - try{ - hstoreFiles = regionToClose.close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFiles == null) { - // The region was closed by someone else - exceptionToThrow = - new IOException("Failed to close region: already closed by another thread"); - } - - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; - throw new IOException(exceptionToThrow); - } - if (regionToClose.getTableDescriptor().hasSerialReplicationScope()) { - // For serial replication, we need add a final barrier on this region. But the splitting - // or merging may be reverted, so we should make sure if we reopen this region, the open - // barrier is same as this final barrier - long seq = regionToClose.getMaxFlushedSeqId(); - if (seq == HConstants.NO_SEQNUM) { - // No edits in WAL for this region; get the sequence number when the region was opened. - seq = regionToClose.getOpenSeqNum(); - if (seq == HConstants.NO_SEQNUM) { - // This region has no data - seq = 0; - } - } else { - seq++; - } - Put finalBarrier = MetaTableAccessor.makeBarrierPut( - Bytes.toBytes(regionEncodedName.get(i)), - seq, - regionToClose.getTableDescriptor().getTableName().getName()); - MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier); - } - // Offline the region - this.removeRegion(regionToClose, null); - } - } - return true; + protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName) + throws IOException { + for (int i = 0; i < regionEncodedName.size(); ++i) { + HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); + if (regionToClose != null) { + Map<byte[], List<HStoreFile>> hstoreFiles = null; + Exception exceptionToThrow = null; + try { + hstoreFiles = regionToClose.close(false); + } catch (Exception e) { + exceptionToThrow = e; + } + if (exceptionToThrow == null && hstoreFiles == null) { + // The region was closed by someone else + exceptionToThrow = + new IOException("Failed to close region: already closed by another thread"); + } + if (exceptionToThrow != null) { + if (exceptionToThrow instanceof IOException) { + throw (IOException) exceptionToThrow; + } + throw new IOException(exceptionToThrow); + } + // Offline the region + this.removeRegion(regionToClose, null); + } + } + return true; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 4e88df0..6edeaed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -117,6 +117,7 @@ class FSWALEntry extends Entry { PrivateCellUtil.setSequenceId(c, regionSequenceId); } } + getKey().setWriteEntry(we); return regionSequenceId; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 630b90b..66b8881 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -74,8 +74,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper try { WALEntryBatch entryBatch = entryReader.take(); shipEdits(entryBatch); - if (entryBatch.getWalEntries().isEmpty() - && entryBatch.getLastSeqIds().isEmpty()) { + if (entryBatch.getWalEntries().isEmpty()) { LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + source.getPeerClusterZnode()); source.getSourceMetrics().incrCompletedRecoveryQueue(); http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 8346824..2cf3a82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,12 +43,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -62,7 +58,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -124,8 +119,6 @@ public class ReplicationSourceManager implements ReplicationListener { private final boolean replicationForBulkLoadDataEnabled; - private Connection connection; - private long replicationWaitTime; private AtomicLong totalBufferUsed = new AtomicLong(); @@ -177,12 +170,8 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.latestPaths = new HashSet<Path>(); - replicationForBulkLoadDataEnabled = - conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, - HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT); - connection = ConnectionFactory.createConnection(conf); + replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); } /** @@ -848,10 +837,6 @@ public class ReplicationSourceManager implements ReplicationListener { return this.fs; } - public Connection getConnection() { - return this.connection; - } - /** * Get the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager @@ -884,106 +869,4 @@ public class ReplicationSourceManager implements ReplicationListener { public void cleanUpHFileRefs(String peerId, List<String> files) { this.replicationQueues.removeHFileRefs(peerId, files); } - - /** - * Whether an entry can be pushed to the peer or not right now. - * If we enable serial replication, we can not push the entry until all entries in its region - * whose sequence numbers are smaller than this entry have been pushed. - * For each ReplicationSource, we need only check the first entry in each region, as long as it - * can be pushed, we can push all in this ReplicationSource. - * This method will be blocked until we can push. - * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to - * prevent saving positions in the region of no barrier. - */ - void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId) - throws IOException, InterruptedException { - /** - * There are barriers for this region and position for this peer. N barriers form N intervals, - * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than - * the first barrier and the last interval is start from the last barrier. - * - * There are several conditions that we can push now, otherwise we should block: - * 1) "Serial replication" is not enabled, we can push all logs just like before. This case - * should not call this method. - * 2) There is no barriers for this region, or the seq id is smaller than the first barrier. - * It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the - * order of logs that is written before altering. - * 3) This entry is in the first interval of barriers. We can push them because it is the - * start of a region. But if the region is created by region split, we should check - * if the parent regions are fully pushed. - * 4) If the entry's seq id and the position are in same section, or the pos is the last - * number of previous section. Because when open a region we put a barrier the number - * is the last log's id + 1. - * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes - * after save replication meta and before save zk offset. - */ - List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName); - if (barriers.isEmpty() || seq <= barriers.get(0)) { - // Case 2 - return; - } - int interval = Collections.binarySearch(barriers, seq); - if (interval < 0) { - interval = -interval - 1;// get the insert position if negative - } - if (interval == 1) { - // Case 3 - // Check if there are parent regions - String parentValue = MetaTableAccessor.getSerialReplicationParentRegion(connection, - encodedName); - if (parentValue == null) { - // This region has no parent or the parent's log entries are fully pushed. - return; - } - while (true) { - boolean allParentDone = true; - String[] parentRegions = parentValue.split(","); - for (String parent : parentRegions) { - byte[] region = Bytes.toBytes(parent); - long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, region, peerId); - List<Long> parentBarriers = MetaTableAccessor.getReplicationBarriers(connection, region); - if (parentBarriers.size() > 0 - && parentBarriers.get(parentBarriers.size() - 1) - 1 > pos) { - allParentDone = false; - // For a closed region, we will write a close event marker to WAL whose sequence id is - // larger than final barrier but still smaller than next region's openSeqNum. - // So if the pos is larger than last barrier, we can say we have read the event marker - // which means the parent region has been fully pushed. - LOG.info(Bytes.toString(encodedName) + " can not start pushing because parent region's" - + " log has not been fully pushed: parent=" + Bytes.toString(region) + " pos=" + pos - + " barriers=" + Arrays.toString(barriers.toArray())); - break; - } - } - if (allParentDone) { - return; - } else { - Thread.sleep(replicationWaitTime); - } - } - - } - - while (true) { - long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId); - if (seq <= pos) { - // Case 5 - } - if (pos >= 0) { - // Case 4 - int posInterval = Collections.binarySearch(barriers, pos); - if (posInterval < 0) { - posInterval = -posInterval - 1;// get the insert position if negative - } - if (posInterval == interval || pos == barriers.get(interval - 1) - 1) { - return; - } - } - - LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId - + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos - + " barriers=" + Arrays.toString(barriers.toArray())); - Thread.sleep(replicationWaitTime); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index ea98cda..ced2980 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -20,31 +20,23 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; -import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; -import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** * This thread reads entries from a queue and ships them. Entries are placed onto the queue by @@ -79,17 +71,6 @@ public class ReplicationSourceShipper extends Thread { // Maximum number of retries before taking bold actions protected final int maxRetriesMultiplier; - // Use guava cache to set ttl for each key - private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder() - .expireAfterAccess(1, TimeUnit.DAYS).build( - new CacheLoader<String, Boolean>() { - @Override - public Boolean load(String key) throws Exception { - return false; - } - } - ); - public ReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { this.conf = conf; @@ -125,9 +106,6 @@ public class ReplicationSourceShipper extends Thread { try { WALEntryBatch entryBatch = entryReader.take(); - for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) { - waitingUntilCanPush(entry); - } shipEdits(entryBatch); } catch (InterruptedException e) { LOG.trace("Interrupted while waiting for next replication entry batch", e); @@ -150,8 +128,6 @@ public class ReplicationSourceShipper extends Thread { int sleepMultiplier = 0; if (entries.isEmpty()) { if (lastLoggedPosition != lastReadPosition) { - // Save positions to meta table before zk. - updateSerialRepPositions(entryBatch.getLastSeqIds()); updateLogPosition(lastReadPosition); // if there was nothing to ship and it's not an error // set "ageOfLastShippedOp" to <now> to indicate that we're current @@ -197,9 +173,6 @@ public class ReplicationSourceShipper extends Thread { for (int i = 0; i < size; i++) { cleanUpHFileRefs(entries.get(i).getEdit()); } - - // Save positions to meta table before zk. - updateSerialRepPositions(entryBatch.getLastSeqIds()); //Log and clean up WAL logs updateLogPosition(lastReadPosition); } @@ -225,33 +198,6 @@ public class ReplicationSourceShipper extends Thread { } } - private void waitingUntilCanPush(Map.Entry<String, Long> entry) { - String key = entry.getKey(); - long seq = entry.getValue(); - boolean deleteKey = false; - if (seq <= 0) { - // There is a REGION_CLOSE marker, we can not continue skipping after this entry. - deleteKey = true; - seq = -seq; - } - - if (!canSkipWaitingSet.getUnchecked(key)) { - try { - source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId()); - } catch (IOException e) { - LOG.error("waitUntilCanBePushed fail", e); - throw new RuntimeException("waitUntilCanBePushed fail"); - } catch (InterruptedException e) { - LOG.warn("waitUntilCanBePushed interrupted", e); - Thread.currentThread().interrupt(); - } - canSkipWaitingSet.put(key, true); - } - if (deleteKey) { - canSkipWaitingSet.invalidate(key); - } - } - private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { @@ -282,16 +228,6 @@ public class ReplicationSourceShipper extends Thread { lastLoggedPosition = lastReadPosition; } - private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) { - try { - MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(), - source.getPeerId(), lastPositionsForSerialScope); - } catch (IOException e) { - LOG.error("updateReplicationPositions fail", e); - throw new RuntimeException("updateReplicationPositions fail"); - } - } - public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + "," http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index e56fab2..b6b50ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -21,14 +21,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,7 +33,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -45,7 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -135,7 +131,7 @@ public class ReplicationSourceWALReader extends Thread { continue; } WALEntryBatch batch = readWALEntries(entryStream); - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { + if (batch != null && batch.getNbEntries() > 0) { if (LOG.isTraceEnabled()) { LOG.trace(String.format("Read %s WAL entries eligible for replication", batch.getNbEntries())); @@ -171,10 +167,6 @@ public class ReplicationSourceWALReader extends Thread { batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); } Entry entry = entryStream.next(); - if (updateSerialReplPos(batch, entry)) { - batch.lastWalPosition = entryStream.getPosition(); - break; - } entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); @@ -247,33 +239,6 @@ public class ReplicationSourceWALReader extends Thread { } /** - * @return true if we should stop reading because we're at REGION_CLOSE - */ - private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException { - if (entry.hasSerialReplicationScope()) { - String key = Bytes.toString(entry.getKey().getEncodedRegionName()); - batch.setLastPosition(key, entry.getKey().getSequenceId()); - if (!entry.getEdit().getCells().isEmpty()) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); - if (maybeEvent != null && maybeEvent - .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) { - // In serially replication, if we move a region to another RS and move it back, we may - // read logs crossing two sections. We should break at REGION_CLOSE and push the first - // section first in case of missing the middle section belonging to the other RS. - // In a worker thread, if we can push the first log of a region, we can push all logs - // in the same region without waiting until we read a close marker because next time - // we read logs in this region, it must be a new section and not adjacent with this - // region. Mark it negative. - batch.setLastPosition(key, -entry.getKey().getSequenceId()); - return true; - } - } - } - return false; - } - - /** * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a * batch to become available * @return A batch of entries, along with the position in the log after reading the batch @@ -407,8 +372,6 @@ public class ReplicationSourceWALReader extends Thread { private int nbHFiles = 0; // heap size of data we need to replicate private long heapSize = 0; - // save the last sequenceid for each region if the table has serial-replication scope - private Map<String, Long> lastSeqIds = new HashMap<>(); /** * @param walEntries @@ -477,13 +440,6 @@ public class ReplicationSourceWALReader extends Thread { return heapSize; } - /** - * @return the last sequenceid for each region if the table has serial-replication scope - */ - public Map<String, Long> getLastSeqIds() { - return lastSeqIds; - } - private void incrementNbRowKeys(int increment) { nbRowKeys += increment; } @@ -495,9 +451,5 @@ public class ReplicationSourceWALReader extends Thread { private void incrementHeapSize(long increment) { heapSize += increment; } - - private void setLastPosition(String region, Long sequenceId) { - getLastSeqIds().put(region, sequenceId); - } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java index c4f0e25..179dfe5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java @@ -405,7 +405,7 @@ public class RestoreSnapshotHelper { } LOG.debug("Update splits parent " + regionInfo.getEncodedName() + " -> " + daughters); - MetaTableAccessor.addSpiltsToParent(connection, regionInfo, + MetaTableAccessor.addSplitsToParent(connection, regionInfo, regionsByName.get(daughters.getFirst()), regionsByName.get(daughters.getSecond())); } http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index 49ed11a..c72b9e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -159,36 +159,6 @@ public class FSTableDescriptors implements TableDescriptors { // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. .setBloomFilterType(BloomType.NONE) .build()) - .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY) - .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, - HConstants.DEFAULT_HBASE_META_VERSIONS)) - .setInMemory(true) - .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, - HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. - .setBloomFilterType(BloomType.NONE) - .build()) - .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_POSITION_FAMILY) - .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, - HConstants.DEFAULT_HBASE_META_VERSIONS)) - .setInMemory(true) - .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, - HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. - .setBloomFilterType(BloomType.NONE) - .build()) - .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_META_FAMILY) - .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, - HConstants.DEFAULT_HBASE_META_VERSIONS)) - .setInMemory(true) - .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, - HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. - .setBloomFilterType(BloomType.NONE) - .build()) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY) .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS)) http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index d478e4f..db6c411 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.Set; - import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; @@ -36,8 +33,6 @@ import org.apache.yetus.audience.InterfaceStability; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -// imports we use from yet-to-be-moved regionsever.wal - /** * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc). @@ -276,18 +271,6 @@ public interface WAL extends Closeable, WALFileLengthProvider { key.setCompressionContext(compressionContext); } - public boolean hasSerialReplicationScope () { - if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) { - return false; - } - for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) { - if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){ - return true; - } - } - return false; - } - @Override public String toString() { return this.key + "=" + this.edit; http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 3831e9c..609c54e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -496,7 +496,7 @@ public class TestMetaTableAccessor { List<RegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); assertEmptyMetaLocation(meta, splitA.getRegionName(), 1); assertEmptyMetaLocation(meta, splitA.getRegionName(), 2); @@ -541,7 +541,7 @@ public class TestMetaTableAccessor { MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3, - HConstants.LATEST_TIMESTAMP, false); + HConstants.LATEST_TIMESTAMP); assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 2); @@ -691,7 +691,7 @@ public class TestMetaTableAccessor { // now merge the regions, effectively deleting the rows for region a and b. MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, - regionInfoA, regionInfoB, sn, 1, masterSystemTime, false); + regionInfoA, regionInfoB, sn, 1, masterSystemTime); result = meta.get(get); serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY, @@ -782,7 +782,7 @@ public class TestMetaTableAccessor { } SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); long prevCalls = scheduler.numPriorityCalls; - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); assertTrue(prevCalls < scheduler.numPriorityCalls); } @@ -819,7 +819,7 @@ public class TestMetaTableAccessor { List<RegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false); + MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); Get get1 = new Get(splitA.getRegionName()); Result resultA = meta.get(get1); Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY, http://git-wip-us.apache.org/repos/asf/hbase/blob/4ddfdaff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 3f01043..6af72ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -348,7 +348,7 @@ public class TestRegionServerMetrics { TEST_UTIL.getAdmin().flush(tableName); metricsRegionServer.getRegionServerWrapper().forceRecompute(); - assertGauge("storeCount", TABLES_ON_MASTER? 1: 7); + assertGauge("storeCount", TABLES_ON_MASTER? 1: 4); assertGauge("storeFileCount", 1); }