Revert "HBASE-17314 Limit total buffered size for all replication sources"
This reverts commit 3826e639672eea11d73da333e6c15f6b7c23a46c. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1d2ff46 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1d2ff46 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1d2ff46 Branch: refs/heads/hbase-12439 Commit: a1d2ff4646743a9136bb1182c0512bce28e358b7 Parents: acd0218 Author: Michael Stack <st...@apache.org> Authored: Wed Dec 21 11:17:28 2016 -0800 Committer: Michael Stack <st...@apache.org> Committed: Wed Dec 21 11:17:28 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HConstants.java | 4 - .../hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 38 +--- .../regionserver/ReplicationSourceManager.java | 8 - .../replication/TestReplicationEndpoint.java | 3 +- .../regionserver/TestGlobalThrottler.java | 184 ------------------- 6 files changed, 10 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index dc96c2a..48d9778 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -932,10 +932,6 @@ public final class HConstants { public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; - public static final String REPLICATION_SOURCE_TOTAL_BUFFER_KEY = "replication.total.buffer.quota"; - public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; - - /** * Directory where the source cluster file system client configuration are placed which is used by * sink cluster to copy HFiles from source cluster file system http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 853d699..5bc0a66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2340,8 +2340,7 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - @VisibleForTesting - public ReplicationSourceService getReplicationSourceService() { + ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3fb5f94..f777282 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -38,7 +38,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; @@ -151,9 +150,6 @@ public class ReplicationSource extends Thread private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads = new ConcurrentHashMap<String, ReplicationSourceWorkerThread>(); - private AtomicInteger totalBufferUsed; - private int totalBufferQuota; - /** * Instantiation method used by region servers * @@ -205,9 +201,7 @@ public class ReplicationSource extends Thread defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); - this.totalBufferUsed = manager.getTotalBufferUsed(); - this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth=" @@ -542,7 +536,7 @@ public class ReplicationSource extends Thread private boolean workerRunning = true; // Current number of hfiles that we need to replicate private long currentNbHFiles = 0; - List<WAL.Entry> entries; + // Use guava cache to set ttl for each key private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.DAYS).build( @@ -562,7 +556,6 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = replicationQueueInfo; this.repLogReader = new ReplicationWALReaderManager(fs, conf); this.source = source; - this.entries = new ArrayList<>(); } @Override @@ -635,7 +628,8 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbOperations = 0; currentNbHFiles = 0; - entries.clear(); + List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1); + Map<String, Long> lastPositionsForSerialScope = new HashMap<>(); currentSize = 0; try { @@ -727,7 +721,6 @@ public class ReplicationSource extends Thread continue; } shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope); - releaseBufferQuota(); } if (replicationQueueInfo.isQueueRecovered()) { // use synchronize to make sure one last thread will clean the queue @@ -817,7 +810,7 @@ public class ReplicationSource extends Thread } } } - boolean totalBufferTooLarge = false; + // don't replicate if the log entries have already been consumed by the cluster if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { @@ -835,16 +828,15 @@ public class ReplicationSource extends Thread logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); entries.add(entry); - int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit); - currentSize += delta; - totalBufferTooLarge = acquireBufferQuota(delta); + currentSize += entry.getEdit().heapSize(); + currentSize += calculateTotalSizeOfStoreFiles(edit); } else { metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big // FIXME check the relationship between single wal group and overall - if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity + if (currentSize >= replicationQueueSizeCapacity || entries.size() >= replicationQueueNbCapacity) { break; } @@ -1325,19 +1317,5 @@ public class ReplicationSource extends Thread public void setWorkerRunning(boolean workerRunning) { this.workerRunning = workerRunning; } - - /** - * @param size delta size for grown buffer - * @return true if we should clear buffer and push all - */ - private boolean acquireBufferQuota(int size) { - return totalBufferUsed.addAndGet(size) >= totalBufferQuota; - } - - private void releaseBufferQuota() { - totalBufferUsed.addAndGet(-currentSize); - currentSize = 0; - entries.clear(); - } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2634a52..2c9fdcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -42,7 +42,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -127,8 +126,6 @@ public class ReplicationSourceManager implements ReplicationListener { private Connection connection; private long replicationWaitTime; - private AtomicInteger totalBufferUsed = new AtomicInteger(); - /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -438,11 +435,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - @VisibleForTesting - AtomicInteger getTotalBufferUsed() { - return totalBufferUsed; - } - /** * Factory method to create a replication source * @param conf the configuration to use http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index f9c467e..002b8c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -362,7 +361,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Override public boolean replicate(ReplicateContext replicateContext) { replicateCount.incrementAndGet(); - lastEntries = new ArrayList<>(replicateContext.entries); + lastEntries = replicateContext.entries; return true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a1d2ff46/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java deleted file mode 100644 index a40d7ed..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.HTestConst; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestGlobalThrottler { - private static final Log LOG = LogFactory.getLog(TestGlobalThrottler.class); - private static Configuration conf1; - private static Configuration conf2; - - private static HBaseTestingUtility utility1; - private static HBaseTestingUtility utility2; - - private static final byte[] famName = Bytes.toBytes("f"); - private static final byte[] VALUE = Bytes.toBytes("v"); - private static final byte[] ROW = Bytes.toBytes("r"); - private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf1 = HBaseConfiguration.create(); - conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); - conf1.setLong("replication.source.sleepforretries", 100); - // Each WAL is about 120 bytes - conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200); - conf1.setLong("replication.source.per.peer.node.bandwidth", 100L); - - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniZKCluster(); - MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - new ZooKeeperWatcher(conf1, "cluster1", null, true); - - conf2 = new Configuration(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - - utility2 = new HBaseTestingUtility(conf2); - utility2.setZkCluster(miniZK); - new ZooKeeperWatcher(conf2, "cluster2", null, true); - - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); - admin1.addPeer("peer1", rpc, null); - admin1.addPeer("peer2", rpc, null); - admin1.addPeer("peer3", rpc, null); - - utility1.startMiniCluster(1, 1); - utility2.startMiniCluster(1, 1); - } - - @AfterClass - public static void setDownAfterClass() throws Exception { - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); - } - - - volatile private boolean testQuotaPass = false; - volatile private boolean testQuotaNonZero = false; - @Test - public void testQuota() throws IOException { - TableName tableName = TableName.valueOf("testQuota"); - HTableDescriptor table = new HTableDescriptor(tableName); - HColumnDescriptor fam = new HColumnDescriptor(famName); - fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL); - table.addFamily(fam); - utility1.getHBaseAdmin().createTable(table); - utility2.getHBaseAdmin().createTable(table); - - Thread watcher = new Thread(()->{ - Replication replication = (Replication)utility1.getMiniHBaseCluster() - .getRegionServer(0).getReplicationSourceService(); - AtomicInteger bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); - testQuotaPass = true; - while (!Thread.interrupted()) { - int size = bufferUsed.get(); - if (size > 0) { - testQuotaNonZero = true; - } - if (size > 600) { - // We read logs first then check throttler, so if the buffer quota limiter doesn't - // take effect, it will push many logs and exceed the quota. - testQuotaPass = false; - } - Threads.sleep(50); - } - }); - watcher.start(); - - try(Table t1 = utility1.getConnection().getTable(tableName); - Table t2 = utility2.getConnection().getTable(tableName)) { - for (int i = 0; i < 50; i++) { - Put put = new Put(ROWS[i]); - put.addColumn(famName, VALUE, VALUE); - t1.put(put); - } - long start = EnvironmentEdgeManager.currentTime(); - while (EnvironmentEdgeManager.currentTime() - start < 180000) { - Scan scan = new Scan(); - scan.setCaching(50); - int count = 0; - try (ResultScanner results = t2.getScanner(scan)) { - for (Result result : results) { - count++; - } - } - if (count < 50) { - LOG.info("Waiting for all logs pushed to slave. Expected 50 , actual " + count); - Threads.sleep(200); - continue; - } - break; - } - } - - watcher.interrupt(); - Assert.assertTrue(testQuotaPass); - Assert.assertTrue(testQuotaNonZero); - } - - private List<Integer> getRowNumbers(List<Cell> cells) { - List<Integer> listOfRowNumbers = new ArrayList<>(); - for (Cell c : cells) { - listOfRowNumbers.add(Integer.parseInt(Bytes - .toString(c.getRowArray(), c.getRowOffset() + ROW.length, - c.getRowLength() - ROW.length))); - } - return listOfRowNumbers; - } -}