http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java new file mode 100644 index 0000000..31c3ac7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Holds a batch of WAL entries to replicate, along with some statistics + */ +@InterfaceAudience.Private +class WALEntryBatch { + private List<Entry> walEntries; + // last WAL that was read + private Path lastWalPath; + // position in WAL of last entry in this batch + private long lastWalPosition = 0; + // number of distinct row keys in this batch + private int nbRowKeys = 0; + // number of HFiles + private int nbHFiles = 0; + // heap size of data we need to replicate + private long heapSize = 0; + // save the last sequenceid for each region if the table has serial-replication scope + private Map<String, Long> lastSeqIds = new HashMap<>(); + + /** + * @param lastWalPath Path of the WAL the last entry in this batch was read from + */ + WALEntryBatch(int maxNbEntries, Path lastWalPath) { + this.walEntries = new ArrayList<>(maxNbEntries); + this.lastWalPath = lastWalPath; + } + + public void addEntry(Entry entry) { + walEntries.add(entry); + } + + /** + * @return the WAL Entries. + */ + public List<Entry> getWalEntries() { + return walEntries; + } + + /** + * @return the path of the last WAL that was read. + */ + public Path getLastWalPath() { + return lastWalPath; + } + + /** + * @return the position in the last WAL that was read. + */ + public long getLastWalPosition() { + return lastWalPosition; + } + + public void setLastWalPosition(long lastWalPosition) { + this.lastWalPosition = lastWalPosition; + } + + public int getNbEntries() { + return walEntries.size(); + } + + /** + * @return the number of distinct row keys in this batch + */ + public int getNbRowKeys() { + return nbRowKeys; + } + + /** + * @return the number of HFiles in this batch + */ + public int getNbHFiles() { + return nbHFiles; + } + + /** + * @return total number of operations in this batch + */ + public int getNbOperations() { + return getNbRowKeys() + getNbHFiles(); + } + + /** + * @return the heap size of this batch + */ + public long getHeapSize() { + return heapSize; + } + + /** + * @return the last sequenceid for each region if the table has serial-replication scope + */ + public Map<String, Long> getLastSeqIds() { + return lastSeqIds; + } + + public void incrementNbRowKeys(int increment) { + nbRowKeys += increment; + } + + public void incrementNbHFiles(int increment) { + nbHFiles += increment; + } + + public void incrementHeapSize(long increment) { + heapSize += increment; + } + + public void setLastSeqId(String region, long sequenceId) { + lastSeqIds.put(region, sequenceId); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 7c83c0c..bcab9b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.NoSuchElementException; import java.util.OptionalLong; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually @@ -102,16 +99,18 @@ class WALEntryStream implements Closeable { } /** - * @return the next WAL entry in this stream - * @throws IOException - * @throws NoSuchElementException if no more entries in the stream. + * Returns the next WAL entry in this stream but does not advance. + */ + public Entry peek() throws IOException { + return hasNext() ? currentEntry: null; + } + + /** + * Returns the next WAL entry in this stream and advance the stream. */ public Entry next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - Entry save = currentEntry; - currentEntry = null; // gets reloaded by hasNext() + Entry save = peek(); + currentEntry = null; return save; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index a67bca1..85292f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -170,6 +170,14 @@ public class FSTableDescriptors implements TableDescriptors { // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. .setBloomFilterType(BloomType.NONE) .build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(HConstants.REPLICATION_BARRIER_FAMILY) + .setMaxVersions(HConstants.ALL_VERSIONS) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. + .setBloomFilterType(BloomType.NONE) + .build()) .setCoprocessor(CoprocessorDescriptorBuilder.newBuilder( MultiRowMutationEndpoint.class.getName()) .setPriority(Coprocessor.PRIORITY_SYSTEM) http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index c0b72aa..b106a31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -18,13 +18,7 @@ */ package org.apache.hadoop.hbase.util; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; -import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; - import edu.umd.cs.findbugs.annotations.CheckForNull; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.EOFException; @@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; @@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HFileLink; @@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; @@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; +import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; /** * Utility methods for interacting with the underlying file system. @@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils { return regionDirs; } + public static Path getRegionDir(Path tableDir, RegionInfo region) { + return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); + } + /** * Filter for all dirs that are legal column family names. This is generally used for colfam * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index c1a77ee..ac23d1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey { this.replicationScope = replicationScope; } - public void serializeReplicationScope(boolean serialize) { - if (!serialize) { - setReplicationScope(null); + public void clearReplicationScope() { + setReplicationScope(null); + } + + public boolean hasSerialReplicationScope() { + if (replicationScope == null || replicationScope.isEmpty()) { + return false; } + return replicationScope.values().stream() + .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index ec93207..9161e25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -494,7 +494,7 @@ public class TestMetaTableAccessor { List<RegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); + MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3); assertEmptyMetaLocation(meta, splitA.getRegionName(), 1); assertEmptyMetaLocation(meta, splitA.getRegionName(), 2); @@ -535,7 +535,8 @@ public class TestMetaTableAccessor { List<RegionInfo> regionInfos = Lists.newArrayList(parentA, parentB); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3); + MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0, + 3); assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 2); @@ -682,8 +683,8 @@ public class TestMetaTableAccessor { EnvironmentEdgeManager.injectEdge(edge); try { // now merge the regions, effectively deleting the rows for region a and b. - MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn, - 1); + MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB, + -1L, sn, 1); } finally { EnvironmentEdgeManager.reset(); } @@ -776,7 +777,8 @@ public class TestMetaTableAccessor { } SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); long prevCalls = scheduler.numPriorityCalls; - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); + MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(), + 1); assertTrue(prevCalls < scheduler.numPriorityCalls); } @@ -813,7 +815,7 @@ public class TestMetaTableAccessor { List<RegionInfo> regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); + MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3); Get get1 = new Get(splitA.getRegionName()); Result resultA = meta.get(get1); Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY, http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java index a4e8e19..e00f072 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java @@ -194,7 +194,7 @@ public class TestHRegionFileSystem { @Test public void testOnDiskRegionCreation() throws IOException { - Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName()); FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); @@ -226,7 +226,7 @@ public class TestHRegionFileSystem { @Test public void testNonIdempotentOpsWithRetries() throws IOException { - Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName()); FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); @@ -235,19 +235,15 @@ public class TestHRegionFileSystem { HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri); assertTrue(fs.exists(regionFs.getRegionDir())); - regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), - null, null); - // HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir, - // hri); + regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri); boolean result = regionFs.createDir(new Path("/foo/bar")); assertTrue("Couldn't create the directory", result); - - regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); + regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri); result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2")); assertTrue("Couldn't rename the directory", result); - regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); + regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri); result = regionFs.deleteDir(new Path("/foo/bar")); assertTrue("Couldn't delete the directory", result); fs.delete(rootDir, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 50dffd5..fab6512 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -343,12 +343,12 @@ public class TestRegionServerMetrics { @Test public void testStoreCount() throws Exception { - //Force a hfile. + // Force a hfile. doNPuts(1, false); TEST_UTIL.getAdmin().flush(tableName); metricsRegionServer.getRegionServerWrapper().forceRecompute(); - assertGauge("storeCount", TABLES_ON_MASTER? 1: 4); + assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5); assertGauge("storeFileCount", 1); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index ffa03a2..e9e92b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; -import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.Before; @@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category(LargeTests.class) +@Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationDroppedTables extends TestReplicationBase { @ClassRule @@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); - /** - * @throws java.lang.Exception - */ @Before public void setUp() throws Exception { // Starting and stopping replication can make us miss new logs, http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java new file mode 100644 index 0000000..1408cf0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSerialReplication { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialReplication.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID = "1"; + + private static byte[] CF = Bytes.toBytes("CF"); + + private static byte[] CQ = Bytes.toBytes("CQ"); + + private static FileSystem FS; + + private static Path LOG_DIR; + + private static WALProvider.Writer WRITER; + + public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { + + private static final UUID PEER_UUID = UUID.randomUUID(); + + @Override + public UUID getPeerUUID() { + return PEER_UUID; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + synchronized (WRITER) { + try { + for (Entry entry : replicateContext.getEntries()) { + WRITER.append(entry); + } + WRITER.sync(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); + UTIL.startMiniCluster(3); + LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); + FS = UTIL.getTestFileSystem(); + FS.mkdirs(LOG_DIR); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Rule + public final TestName name = new TestName(); + + private Path logPath; + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + UTIL.ensureSomeRegionServersAvailable(3); + logPath = new Path(LOG_DIR, name.getMethodName()); + WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + // add in disable state, so later when enabling it all sources will start push together. + UTIL.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), + false); + } + + @After + public void tearDown() throws IOException { + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + if (WRITER != null) { + WRITER.close(); + WRITER = null; + } + } + + @Test + public void testRegionMove() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(rs.getServerName().getServerName())); + UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return !rs.getRegions(tableName).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return region + " is still not on " + rs; + } + }); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 100; i < 200; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + int count = 0; + while (reader.next() != null) { + count++; + } + return count >= 200; + } catch (IOException e) { + return false; + } + } + + @Override + public String explainFailure() throws Exception { + return "Not enough entries replicated"; + } + }); + try (WAL.Reader reader = + WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + long seqId = -1L; + int count = 0; + for (Entry entry;;) { + entry = reader.next(); + if (entry == null) { + break; + } + assertTrue( + "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), + entry.getKey().getSequenceId() >= seqId); + count++; + } + assertEquals(200, count); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index a53cba3..6d75fec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager { wal.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false); + "1", 0, null, false); wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java new file mode 100644 index 0000000..c8387c5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplicationChecker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialReplicationChecker.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID = "1"; + + private static ReplicationQueueStorage QUEUE_STORAGE; + + private static String WAL_FILE_NAME = "test.wal"; + + private SerialReplicationChecker checker; + + @Rule + public final TestName name = new TestName(); + + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), + UTIL.getConfiguration()); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID, + WAL_FILE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + ReplicationSource source = mock(ReplicationSource.class); + when(source.getPeerId()).thenReturn(PEER_ID); + when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE); + Server server = mock(Server.class); + when(server.getConnection()).thenReturn(UTIL.getConnection()); + when(source.getServer()).thenReturn(server); + checker = new SerialReplicationChecker(UTIL.getConfiguration(), source); + tableName = TableName.valueOf(name.getMethodName()); + } + + private Entry createEntry(RegionInfo region, long seqId) { + WALKeyImpl key = mock(WALKeyImpl.class); + when(key.getTableName()).thenReturn(tableName); + when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes()); + when(key.getSequenceId()).thenReturn(seqId); + Entry entry = mock(Entry.class); + when(entry.getKey()).thenReturn(key); + return entry; + } + + private Cell createCell(RegionInfo region) { + return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey()) + .setType(Type.Put).build(); + } + + @Test + public void testNoBarrierCanPush() throws IOException { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + assertTrue(checker.canPush(createEntry(region, 100), createCell(region))); + } + + private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) + throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + for (int i = 0; i < barriers.length; i++) { + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, + put.getTimeStamp() - i, Bytes.toBytes(barriers[i])); + } + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void setState(RegionInfo region, RegionState.State state) throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + } + + @Test + public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addStateAndBarrier(region, RegionState.State.OPEN, 10); + Cell cell = createCell(region); + // can push since we are in the first range + assertTrue(checker.canPush(createEntry(region, 100), cell)); + setState(region, RegionState.State.OPENING); + // can not push since we are in the last range and the state is OPENING + assertFalse(checker.canPush(createEntry(region, 102), cell)); + addStateAndBarrier(region, RegionState.State.OPEN, 50); + // can not push since the previous range has not been finished yet + assertFalse(checker.canPush(createEntry(region, 102), cell)); + updatePushedSeqId(region, 49); + // can push since the previous range has been finished + assertTrue(checker.canPush(createEntry(region, 102), cell)); + setState(region, RegionState.State.OPENING); + assertFalse(checker.canPush(createEntry(region, 104), cell)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2146e47..eb7d5a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.NavigableMap; -import java.util.NoSuchElementException; import java.util.OptionalLong; import java.util.TreeMap; import java.util.concurrent.PriorityBlockingQueue; @@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -180,15 +180,12 @@ public class TestWALEntryStream { new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); - WAL.Entry entry = entryStream.next(); + WAL.Entry entry = entryStream.peek(); + assertSame(entry, entryStream.next()); assertNotNull(entry); assertFalse(entryStream.hasNext()); - try { - entry = entryStream.next(); - fail(); - } catch (NoSuchElementException e) { - // expected - } + assertNull(entryStream.peek()); + assertNull(entryStream.next()); oldPos = entryStream.getPosition(); } @@ -346,10 +343,12 @@ public class TestWALEntryStream { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + Server mockServer= Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); when(source.getWALFileLengthProvider()).thenReturn(log); + when(source.getServer()).thenReturn(mockServer); ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); Path walPath = walQueue.peek();