HBASE-20481 Replicate entries from same region serially in ReplicationEndpoint for serial replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6225b4a4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6225b4a4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6225b4a4 Branch: refs/heads/HBASE-19064 Commit: 6225b4a492c40a03475b666915b96984b25b3c47 Parents: 9b9f851 Author: huzheng <open...@gmail.com> Authored: Wed May 2 10:44:42 2018 +0800 Committer: huzheng <open...@gmail.com> Committed: Fri May 4 15:22:02 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationEndpoint.java | 2 +- .../HBaseInterClusterReplicationEndpoint.java | 281 +++++++++--------- .../TestReplicationAdminWithClusters.java | 1 - .../replication/TestReplicationEndpoint.java | 36 +-- .../regionserver/TestReplicator.java | 288 +++---------------- .../TestSerialReplicationEndpoint.java | 188 ++++++++++++ 6 files changed, 384 insertions(+), 412 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 543dc2f..f4c37b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -117,7 +117,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { /** * Initialize the replication endpoint with the given context. * @param context replication context - * @throws IOException + * @throws IOException error occur when initialize the endpoint. */ void init(Context context) throws IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index fd3c671..7db53aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -24,9 +24,9 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -37,6 +37,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -108,6 +111,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean replicationBulkLoadDataEnabled; private Abortable abortable; private boolean dropOnDeletedTables; + private boolean isSerial = false; @Override public void init(Context context) throws IOException { @@ -160,6 +164,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); baseNamespaceDir = new Path(rootDir, baseNSDir); hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); + isSerial = context.getPeerConfig().isSerial(); } private void decorateConf() { @@ -203,40 +208,60 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return sleepMultiplier < maxRetriesMultiplier; } - private List<List<Entry>> createBatches(final List<Entry> entries) { + private int getEstimatedEntrySize(Entry e) { + long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf(); + return (int) size; + } + + private List<List<Entry>> createParallelBatches(final List<Entry> entries) { int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); - int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); - // Maintains the current batch for a given partition index - Map<Integer, List<Entry>> entryMap = new HashMap<>(n); - List<List<Entry>> entryLists = new ArrayList<>(); + int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks); + List<List<Entry>> entryLists = + Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList()); int[] sizes = new int[n]; - - for (int i = 0; i < n; i++) { - entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1)); - } - - for (Entry e: entries) { - int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n); - int entrySize = (int)e.getKey().estimatedSerializedSizeOf() + - (int)e.getEdit().estimatedSerializedSizeOf(); - // If this batch is oversized, add it to final list and initialize a new empty batch - if (sizes[index] > 0 /* must include at least one entry */ && - sizes[index] + entrySize > replicationRpcLimit) { - entryLists.add(entryMap.get(index)); - entryMap.put(index, new ArrayList<Entry>()); + for (Entry e : entries) { + int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n); + int entrySize = getEstimatedEntrySize(e); + // If this batch has at least one entry and is over sized, move it to the tail of list and + // initialize the entryLists[index] to be a empty list. + if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) { + entryLists.add(entryLists.get(index)); + entryLists.set(index, new ArrayList<>()); sizes[index] = 0; } - entryMap.get(index).add(e); + entryLists.get(index).add(e); sizes[index] += entrySize; } - - entryLists.addAll(entryMap.values()); return entryLists; } + private List<List<Entry>> createSerialBatches(final List<Entry> entries) { + Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Entry e : entries) { + regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>()) + .add(e); + } + return new ArrayList<>(regionEntries.values()); + } + + /** + * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool + * concurrently. Note that, for serial replication, we need to make sure that entries from the + * same region to be replicated serially, so entries from the same region consist of a batch, and + * we will divide a batch into several batches by replicationRpcLimit in method + * serialReplicateRegionEntries() + */ + private List<List<Entry>> createBatches(final List<Entry> entries) { + if (isSerial) { + return createSerialBatches(entries); + } else { + return createParallelBatches(entries); + } + } + private TableName parseTable(String msg) { // ... TableNotFoundException: '<table>'/n... - Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'"); + Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'"); Matcher m = p.matcher(msg); if (m.find()) { String table = m.group(1); @@ -252,17 +277,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Filter a set of batches by TableName private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) { - List<List<Entry>> entryLists = new ArrayList<>(); - for (List<Entry> entries : oldEntryList) { - ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size()); - entryLists.add(thisList); - for (Entry e : entries) { - if (!e.getKey().getTableName().equals(table)) { - thisList.add(e); - } - } - } - return entryLists; + return oldEntryList + .stream().map(entries -> entries.stream() + .filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList())) + .collect(Collectors.toList()); } private void reconnectToPeerCluster() { @@ -277,13 +295,55 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } } + private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, + List<List<Entry>> batches) throws IOException { + int futures = 0; + for (int i = 0; i < batches.size(); i++) { + List<Entry> entries = batches.get(i); + if (!entries.isEmpty()) { + LOG.trace("Submitting {} entries of total size {}", entries.size(), + replicateContext.getSize()); + // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource + pool.submit(createReplicator(entries, i)); + futures++; + } + } + + IOException iox = null; + long lastWriteTime = 0; + for (int i = 0; i < futures; i++) { + try { + // wait for all futures, remove successful parts + // (only the remaining parts will be retried) + Future<Integer> f = pool.take(); + int index = f.get(); + List<Entry> batch = batches.get(index); + batches.set(index, Collections.emptyList()); // remove successful batch + // Find the most recent write time in the batch + long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); + if (writeTime > lastWriteTime) { + lastWriteTime = writeTime; + } + } catch (InterruptedException ie) { + iox = new IOException(ie); + } catch (ExecutionException ee) { + // cause must be an IOException + iox = (IOException) ee.getCause(); + } + } + if (iox != null) { + // if we had any exceptions, try again + throw iox; + } + return lastWriteTime; + } + /** * Do the shipping logic */ @Override public boolean replicate(ReplicateContext replicateContext) { CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); - List<List<Entry>> batches; String walGroupId = replicateContext.getWalGroupId(); int sleepMultiplier = 1; @@ -294,13 +354,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi int numSinks = replicationSinkMgr.getNumSinks(); if (numSinks == 0) { - LOG.warn("No replication sinks found, returning without replicating. The source should retry" - + " with the same set of edits."); + LOG.warn("No replication sinks found, returning without replicating. The source should " + + "retry with the same set of edits."); return false; } - batches = createBatches(replicateContext.getEntries()); - + List<List<Entry>> batches = createBatches(replicateContext.getEntries()); while (this.isRunning() && !exec.isShutdown()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -312,52 +371,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi reconnectToPeerCluster(); } try { - int futures = 0; - for (int i=0; i<batches.size(); i++) { - List<Entry> entries = batches.get(i); - if (!entries.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Submitting " + entries.size() + - " entries of total size " + replicateContext.getSize()); - } - // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - pool.submit(createReplicator(entries, i)); - futures++; - } - } - IOException iox = null; - - long lastWriteTime = 0; - for (int i=0; i<futures; i++) { - try { - // wait for all futures, remove successful parts - // (only the remaining parts will be retried) - Future<Integer> f = pool.take(); - int index = f.get().intValue(); - List<Entry> batch = batches.get(index); - batches.set(index, Collections.<Entry>emptyList()); // remove successful batch - // Find the most recent write time in the batch - long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); - if (writeTime > lastWriteTime) { - lastWriteTime = writeTime; - } - } catch (InterruptedException ie) { - iox = new IOException(ie); - } catch (ExecutionException ee) { - // cause must be an IOException - iox = (IOException)ee.getCause(); - } - } - if (iox != null) { - // if we had any exceptions, try again - throw iox; - } + long lastWriteTime; + + // replicate the batches to sink side. + lastWriteTime = parallelReplicate(pool, replicateContext, batches); + // update metrics if (lastWriteTime > 0) { this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); } return true; - } catch (IOException ioe) { // Didn't ship anything, but must still age the last time we did this.metrics.refreshAgeOfLastShippedOp(walGroupId); @@ -376,7 +399,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Would potentially be better to retry in one of the outer loops // and add a table filter there; but that would break the encapsulation, // so we're doing the filtering here. - LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'"); + LOG.info("Missing table detected at sink, local table also does not exist, " + + "filtering edits for '" + table + "'"); batches = filterBatches(batches, table); continue; } @@ -396,8 +420,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // happened, the cluster is alive and calling it right away // even for a test just makes things worse. sleepForRetries("Encountered a SocketTimeoutException. Since the " + - "call to the remote cluster timed out, which is usually " + - "caused by a machine failure or a massive slowdown", + "call to the remote cluster timed out, which is usually " + + "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); @@ -420,7 +444,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @Override protected void doStop() { - disconnect(); //don't call super.doStop() + disconnect(); // don't call super.doStop() if (this.conn != null) { try { this.conn.close(); @@ -446,61 +470,58 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } @VisibleForTesting - protected Replicator createReplicator(List<Entry> entries, int ordinal) { - return new Replicator(entries, ordinal); - } - - @VisibleForTesting - protected class Replicator implements Callable<Integer> { - private List<Entry> entries; - private int ordinal; - public Replicator(List<Entry> entries, int ordinal) { - this.entries = entries; - this.ordinal = ordinal; - } - - protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch, - String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) - throws IOException { + protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException { + SinkPeer sinkPeer = null; + try { + int entriesHashCode = System.identityHashCode(entries); if (LOG.isTraceEnabled()) { - long size = 0; - for (Entry e: entries) { - size += e.getKey().estimatedSerializedSizeOf(); - size += e.getEdit().estimatedSerializedSizeOf(); - } - LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " + - entries.size() + " entries with total size " + size + " bytes to " + - replicationClusterId); + long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); + LOG.trace("Replicating batch {} of {} entries with total size {} bytes to {}", + entriesHashCode, entries.size(), size, replicationClusterId); } + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); try { - ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]), + ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - if (LOG.isTraceEnabled()) { - LOG.trace("Completed replicating batch " + System.identityHashCode(entries)); - } + LOG.trace("Completed replicating batch {}", entriesHashCode); } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e); - } + LOG.trace("Failed replicating batch {}", entriesHashCode, e); throw e; } + replicationSinkMgr.reportSinkSuccess(sinkPeer); + } catch (IOException ioe) { + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); + } + throw ioe; } + return batchIndex; + } - @Override - public Integer call() throws IOException { - SinkPeer sinkPeer = null; - try { - sinkPeer = replicationSinkMgr.getReplicationSink(); - BlockingInterface rrs = sinkPeer.getRegionServer(); - replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir); - replicationSinkMgr.reportSinkSuccess(sinkPeer); - return ordinal; - } catch (IOException ioe) { - if (sinkPeer != null) { - replicationSinkMgr.reportBadSink(sinkPeer); - } - throw ioe; + private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex) + throws IOException { + int batchSize = 0, index = 0; + List<Entry> batch = new ArrayList<>(); + for (Entry entry : entries) { + int entrySize = getEstimatedEntrySize(entry); + if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { + replicateEntries(batch, index++); + batch.clear(); + batchSize = 0; } + batch.add(entry); + batchSize += entrySize; + } + if (batchSize > 0) { + replicateEntries(batch, index); } + return batchIndex; + } + + @VisibleForTesting + protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) { + return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex) + : () -> replicateEntries(entries, batchIndex); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 7be8c16..268fe00 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -297,7 +297,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { notifyStopped(); } - @Override public UUID getPeerUUID() { return UUID.randomUUID(); http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 3fca0ec..a3c20d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -447,40 +448,15 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override - protected Replicator createReplicator(List<Entry> entries, int ordinal) { + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { // Fail only once, we don't want to slow down the test. if (failedOnce) { - return new DummyReplicator(entries, ordinal); + return () -> ordinal; } else { failedOnce = true; - return new FailingDummyReplicator(entries, ordinal); - } - } - - protected class DummyReplicator extends Replicator { - - private int ordinal; - - public DummyReplicator(List<Entry> entries, int ordinal) { - super(entries, ordinal); - this.ordinal = ordinal; - } - - @Override - public Integer call() throws IOException { - return ordinal; - } - } - - protected class FailingDummyReplicator extends DummyReplicator { - - public FailingDummyReplicator(List<Entry> entries, int ordinal) { - super(entries, ordinal); - } - - @Override - public Integer call() throws IOException { - throw new IOException("Sample Exception: Failed to replicate."); + return () -> { + throw new IOException("Sample Exception: Failed to replicate."); + }; } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index d8db3b1..24329a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -21,9 +21,9 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -39,57 +39,14 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; - @Category(MediumTests.class) -@Ignore("Flaky, needs to be rewritten, see HBASE-19125") public class TestReplicator extends TestReplicationBase { @ClassRule @@ -104,7 +61,6 @@ public class TestReplicator extends TestReplicationBase { // Set RPC size limit to 10kb (will be applied to both source and sink clusters) conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); TestReplicationBase.setUpBeforeClass(); - admin.removePeer("2"); // Remove the peer set up for us by base class } @Test @@ -116,7 +72,8 @@ public class TestReplicator extends TestReplicationBase { // Replace the peer set up for us by the base class with a wrapper for this test admin.addPeer("testReplicatorBatching", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) - .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), + null); ReplicationEndpointForTest.setBatchCount(0); ReplicationEndpointForTest.setEntriesCount(0); @@ -125,11 +82,10 @@ public class TestReplicator extends TestReplicationBase { try { // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all // have to be replicated separately. - final byte[] valueBytes = new byte[8 *1024]; + final byte[] valueBytes = new byte[8 * 1024]; for (int i = 0; i < NUM_ROWS; i++) { - htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i))) - .addColumn(famName, null, valueBytes) - ); + htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, + valueBytes)); } } finally { ReplicationEndpointForTest.resume(); @@ -151,8 +107,7 @@ public class TestReplicator extends TestReplicationBase { assertEquals("We sent an incorrect number of batches", NUM_ROWS, ReplicationEndpointForTest.getBatchCount()); - assertEquals("We did not replicate enough rows", NUM_ROWS, - utility2.countRows(htable2)); + assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2)); } finally { admin.removePeer("testReplicatorBatching"); } @@ -168,7 +123,7 @@ public class TestReplicator extends TestReplicationBase { admin.addPeer("testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), - null); + null); FailureInjectingReplicationEndpointForTest.setBatchCount(0); FailureInjectingReplicationEndpointForTest.setEntriesCount(0); @@ -177,11 +132,10 @@ public class TestReplicator extends TestReplicationBase { try { // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all // have to be replicated separately. - final byte[] valueBytes = new byte[8 *1024]; + final byte[] valueBytes = new byte[8 * 1024]; for (int i = 0; i < NUM_ROWS; i++) { - htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i))) - .addColumn(famName, null, valueBytes) - ); + htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, + valueBytes)); } } finally { FailureInjectingReplicationEndpointForTest.resume(); @@ -201,8 +155,7 @@ public class TestReplicator extends TestReplicationBase { } }); - assertEquals("We did not replicate enough rows", NUM_ROWS, - utility2.countRows(htable2)); + assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2)); } finally { admin.removePeer("testReplicatorWithErrors"); } @@ -221,8 +174,8 @@ public class TestReplicator extends TestReplicationBase { public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { - private static AtomicInteger batchCount = new AtomicInteger(0); - private static int entriesCount; + protected static AtomicInteger batchCount = new AtomicInteger(0); + protected static int entriesCount; private static final Object latch = new Object(); private static AtomicBoolean useLatch = new AtomicBoolean(false); @@ -240,7 +193,7 @@ public class TestReplicator extends TestReplicationBase { public static void await() throws InterruptedException { if (useLatch.get()) { LOG.info("Waiting on latch"); - synchronized(latch) { + synchronized (latch) { latch.wait(); } LOG.info("Waited on latch, now proceeding"); @@ -265,38 +218,6 @@ public class TestReplicator extends TestReplicationBase { entriesCount = i; } - public class ReplicatorForTest extends Replicator { - - public ReplicatorForTest(List<Entry> entries, int ordinal) { - super(entries, ordinal); - } - - @Override - protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries, - String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) - throws IOException { - try { - long size = 0; - for (Entry e: entries) { - size += e.getKey().estimatedSerializedSizeOf(); - size += e.getEdit().estimatedSerializedSizeOf(); - } - LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " + - entries.size() + " entries with total size " + size + " bytes to " + - replicationClusterId); - super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, - hfileArchiveDir); - entriesCount += entries.size(); - int count = batchCount.incrementAndGet(); - LOG.info("Completed replicating batch " + System.identityHashCode(entries) + - " count=" + count); - } catch (IOException e) { - LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e); - throw e; - } - } - } - @Override public boolean replicate(ReplicateContext replicateContext) { try { @@ -308,170 +229,37 @@ public class TestReplicator extends TestReplicationBase { } @Override - protected Replicator createReplicator(List<Entry> entries, int ordinal) { - return new ReplicatorForTest(entries, ordinal); + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { + return () -> { + int batchIndex = replicateEntries(entries, ordinal); + entriesCount += entries.size(); + int count = batchCount.incrementAndGet(); + LOG.info( + "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); + return batchIndex; + }; } } public static class FailureInjectingReplicationEndpointForTest extends ReplicationEndpointForTest { + private final AtomicBoolean failNext = new AtomicBoolean(false); - static class FailureInjectingBlockingInterface implements BlockingInterface { - - private final BlockingInterface delegate; - private volatile boolean failNext; - - public FailureInjectingBlockingInterface(BlockingInterface delegate) { - this.delegate = delegate; - } - - @Override - public GetRegionInfoResponse getRegionInfo(RpcController controller, - GetRegionInfoRequest request) throws ServiceException { - return delegate.getRegionInfo(controller, request); - } - - @Override - public GetStoreFileResponse getStoreFile(RpcController controller, - GetStoreFileRequest request) throws ServiceException { - return delegate.getStoreFile(controller, request); - } - - @Override - public GetOnlineRegionResponse getOnlineRegion(RpcController controller, - GetOnlineRegionRequest request) throws ServiceException { - return delegate.getOnlineRegion(controller, request); - } - - @Override - public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) - throws ServiceException { - return delegate.openRegion(controller, request); - } - - @Override - public WarmupRegionResponse warmupRegion(RpcController controller, - WarmupRegionRequest request) throws ServiceException { - return delegate.warmupRegion(controller, request); - } - - @Override - public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) - throws ServiceException { - return delegate.closeRegion(controller, request); - } - - @Override - public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) - throws ServiceException { - return delegate.flushRegion(controller, request); - } - - @Override - public CompactRegionResponse compactRegion(RpcController controller, - CompactRegionRequest request) throws ServiceException { - return delegate.compactRegion(controller, request); - } - - @Override - public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, - ReplicateWALEntryRequest request) throws ServiceException { - if (!failNext) { - failNext = true; - return delegate.replicateWALEntry(controller, request); - } else { - failNext = false; + @Override + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { + return () -> { + if (failNext.compareAndSet(false, true)) { + int batchIndex = replicateEntries(entries, ordinal); + entriesCount += entries.size(); + int count = batchCount.incrementAndGet(); + LOG.info( + "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); + return batchIndex; + } else if (failNext.compareAndSet(true, false)) { throw new ServiceException("Injected failure"); } - } - - @Override - public ReplicateWALEntryResponse replay(RpcController controller, - ReplicateWALEntryRequest request) throws ServiceException { - return delegate.replay(controller, request); - } - - @Override - public RollWALWriterResponse rollWALWriter(RpcController controller, - RollWALWriterRequest request) throws ServiceException { - return delegate.rollWALWriter(controller, request); - } - - @Override - public GetServerInfoResponse getServerInfo(RpcController controller, - GetServerInfoRequest request) throws ServiceException { - return delegate.getServerInfo(controller, request); - } - - @Override - public StopServerResponse stopServer(RpcController controller, StopServerRequest request) - throws ServiceException { - return delegate.stopServer(controller, request); - } - - @Override - public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, - UpdateFavoredNodesRequest request) throws ServiceException { - return delegate.updateFavoredNodes(controller, request); - } - - @Override - public UpdateConfigurationResponse updateConfiguration(RpcController controller, - UpdateConfigurationRequest request) throws ServiceException { - return delegate.updateConfiguration(controller, request); - } - - @Override - public GetRegionLoadResponse getRegionLoad(RpcController controller, - GetRegionLoadRequest request) throws ServiceException { - return delegate.getRegionLoad(controller, request); - } - - @Override - public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, - ClearCompactionQueuesRequest request) throws ServiceException { - return delegate.clearCompactionQueues(controller, request); - } - - @Override - public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, - GetSpaceQuotaSnapshotsRequest request) throws ServiceException { - return delegate.getSpaceQuotaSnapshots(controller, request); - } - - @Override - public ExecuteProceduresResponse executeProcedures(RpcController controller, - ExecuteProceduresRequest request) - throws ServiceException { - return null; - } - - @Override - public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, - ClearRegionBlockCacheRequest request) throws ServiceException { - return delegate.clearRegionBlockCache(controller, request); - } - } - - public class FailureInjectingReplicatorForTest extends ReplicatorForTest { - - public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) { - super(entries, ordinal); - } - - @Override - protected void replicateEntries(BlockingInterface rrs, List<Entry> entries, - String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) - throws IOException { - super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries, - replicationClusterId, baseNamespaceDir, hfileArchiveDir); - } - } - - @Override - protected Replicator createReplicator(List<Entry> entries, int ordinal) { - return new FailureInjectingReplicatorForTest(entries, ordinal); + return ordinal; + }; } } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java new file mode 100644 index 0000000..7d59d38 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplicationEndpoint { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialReplicationEndpoint.class); + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static Configuration CONF; + private static Connection CONN; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(); + CONF = UTIL.getConfiguration(); + CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400); + CONN = UTIL.getConnection(); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(CONN); + UTIL.shutdownMiniCluster(); + } + + private String getZKClusterKey() { + return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(), + CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + + private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial) + throws IOException { + TestEndpoint.reset(); + int cellNum = 10000; + + TableName tableName = TableName.valueOf(tableNameStr); + byte[] family = Bytes.toBytes("f"); + byte[] qualifier = Bytes.toBytes("q"); + TableDescriptor td = + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); + UTIL.createTable(td, null); + + try (Admin admin = CONN.getAdmin()) { + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName()) + .setReplicateAllUserTables(false).setSerial(isSerial) + .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build(); + admin.addReplicationPeer(peerId, peerConfig); + } + + try (Table table = CONN.getTable(tableName)) { + for (int i = 0; i < cellNum; i++) { + Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier, System.currentTimeMillis(), + Bytes.toBytes(i)); + table.put(put); + } + } + Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum); + + int index = 0; + Assert.assertEquals(TestEndpoint.getEntries().size(), cellNum); + if (!isSerial) { + Collections.sort(TestEndpoint.getEntries(), (a, b) -> { + long seqA = a.getKey().getSequenceId(); + long seqB = b.getKey().getSequenceId(); + return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1); + }); + } + for (Entry entry : TestEndpoint.getEntries()) { + Assert.assertEquals(entry.getKey().getTableName(), tableName); + Assert.assertEquals(entry.getEdit().getCells().size(), 1); + Cell cell = entry.getEdit().getCells().get(0); + Assert.assertArrayEquals( + Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()), + Bytes.toBytes(index)); + index++; + } + Assert.assertEquals(index, cellNum); + } + + @Test + public void testSerialReplicate() throws Exception { + testHBaseReplicationEndpoint("testSerialReplicate", "100", true); + } + + @Test + public void testParallelReplicate() throws Exception { + testHBaseReplicationEndpoint("testParallelReplicate", "101", false); + } + + public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint { + + private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>(); + + public static void reset() { + entryQueue.clear(); + } + + public static List<Entry> getEntries() { + return new ArrayList<>(entryQueue); + } + + @Override + public boolean canReplicateToSameCluster() { + return true; + } + + @Override + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { + return () -> { + entryQueue.addAll(entries); + return ordinal; + }; + } + + @Override + public synchronized List<ServerName> getRegionServers() { + // Return multiple server names for endpoint parallel replication. + return new ArrayList<>( + ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L), + ServerName.valueOf("www.example2.com", 12016, 1525245876026L), + ServerName.valueOf("www.example3.com", 12016, 1525245876026L), + ServerName.valueOf("www.example4.com", 12016, 1525245876026L), + ServerName.valueOf("www.example4.com", 12016, 1525245876026L))); + } + } +}