This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch branch-2.2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push: new 603d2b6 HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2139) 603d2b6 is described below commit 603d2b629aeb749b2ca009dbf54149d7240bb269 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Mon Jul 27 20:15:51 2020 +0530 HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2139) Closes #2127 Signed-off-by: stack <st...@apache.org> --- .../java/org/apache/hadoop/hbase/HConstants.java | 10 ++++ .../hadoop/hbase/regionserver/RSRpcServices.java | 12 +---- .../replication/regionserver/Replication.java | 2 +- .../replication/regionserver/ReplicationSink.java | 60 ++++++++++++---------- .../hbase/regionserver/TestMultiLogThreshold.java | 5 +- .../regionserver/TestReplicationSink.java | 49 +++++++++++++++--- .../regionserver/TestWALEntrySinkFilter.java | 2 +- 7 files changed, 91 insertions(+), 49 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 05366a3..d1537fa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1493,6 +1493,16 @@ public final class HConstants { "hbase.master.executor.logreplayops.threads"; public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + /** + * Number of rows in a batch operation above which a warning will be logged. + */ + public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; + + /** + * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME} + */ + public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9a02831..0819423 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -276,15 +276,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; - /** - * Number of rows in a batch operation above which a warning will be logged. - */ - static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; - /** - * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} - */ - static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; - protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; // Request counter. (Includes requests that are not serviced by regions.) @@ -1229,7 +1220,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { this.ld = ld; regionServer = rs; - rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); + rowSizeWarnThreshold = rs.conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, + HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); RpcSchedulerFactory rpcSchedulerFactory; try { rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 6c46a85..752cfb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -187,7 +187,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer @Override public void startReplicationService() throws IOException { this.replicationManager.init(); - this.replicationSink = new ReplicationSink(this.conf, this.server); + this.replicationSink = new ReplicationSink(this.conf); this.scheduleThreadPool.scheduleAtFixedRate( new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index ae0a732..76e22f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.yetus.audience.InterfaceAudience; @@ -52,13 +52,14 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; /** * <p> @@ -92,15 +93,20 @@ public class ReplicationSink { private WALEntrySinkFilter walEntrySinkFilter; /** + * Row size threshold for multi requests above which a warning is logged + */ + private final int rowSizeWarnThreshold; + + /** * Create a sink for replication - * - * @param conf conf object - * @param stopper boolean to tell this thread to stop + * @param conf conf object * @throws IOException thrown when HDFS goes bad or bad file name */ - public ReplicationSink(Configuration conf, Stoppable stopper) + public ReplicationSink(Configuration conf) throws IOException { this.conf = HBaseConfiguration.create(conf); + rowSizeWarnThreshold = conf.getInt( + HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); @@ -210,11 +216,7 @@ public class ReplicationSink { // Map of table name Vs list of pair of family and list of // hfile paths from its namespace Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = - bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); - } + bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } } else { @@ -247,7 +249,7 @@ public class ReplicationSink { if (!rowMap.isEmpty()) { LOG.debug("Started replicating mutations."); for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) { - batch(entry.getKey(), entry.getValue().values()); + batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); } @@ -372,17 +374,10 @@ public class ReplicationSink { * @param value * @return the list of values corresponding to key1 and key2 */ - private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) { - Map<K2,List<V>> innerMap = map.get(key1); - if (innerMap == null) { - innerMap = new HashMap<>(); - map.put(key1, innerMap); - } - List<V> values = innerMap.get(key2); - if (values == null) { - values = new ArrayList<>(); - innerMap.put(key2, values); - } + private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, + V value) { + Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>()); + List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>()); values.add(value); return values; } @@ -410,9 +405,10 @@ public class ReplicationSink { * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions - * @throws IOException + * @param batchRowSizeThreshold rowSize threshold for batch mutation */ - protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { + private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) + throws IOException { if (allRows.isEmpty()) { return; } @@ -421,7 +417,15 @@ public class ReplicationSink { Connection connection = getConnection(); table = connection.getTable(tableName); for (List<Row> rows : allRows) { - table.batch(rows, null); + List<List<Row>> batchRows; + if (rows.size() > batchRowSizeThreshold) { + batchRows = Lists.partition(rows, batchRowSizeThreshold); + } else { + batchRows = Collections.singletonList(rows); + } + for(List<Row> rowList:batchRows){ + table.batch(rowList, null); + } } } catch (RetriesExhaustedWithDetailsException rewde) { for (Throwable ex : rewde.getCauses()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java index 8e11ed5..f41335f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -67,8 +68,8 @@ public class TestMultiLogThreshold { final TableName tableName = TableName.valueOf("tableName"); TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); - THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME, - RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT); + THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, + HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); TEST_UTIL.startMiniCluster(); TEST_UTIL.createTable(tableName, TEST_FAM); RS = TEST_UTIL.getRSForFirstRegionInTable(tableName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index aa6c39c..adb077d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.security.SecureRandom; import java.util.ArrayList; @@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; -@Category({ReplicationTests.class, MediumTests.class}) +@Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSink { @ClassRule @@ -127,10 +126,8 @@ public class TestReplicationSink { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); - TEST_UTIL.startMiniCluster(3); - SINK = - new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE); + SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration())); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); @@ -203,6 +200,40 @@ public class TestReplicationSink { assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); } + @Test + public void testLargeEditsPutDelete() throws Exception { + List<WALEntry> entries = new ArrayList<>(); + List<Cell> cells = new ArrayList<>(); + for (int i = 0; i < 5510; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + + ResultScanner resultScanner = table1.getScanner(new Scan()); + int totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5510, totalRows); + + entries = new ArrayList<>(); + cells = new ArrayList<>(); + for (int i = 0; i < 11000; i++) { + entries.add( + createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, + cells)); + } + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId, + baseNamespaceDir, hfileArchiveDir); + resultScanner = table1.getScanner(new Scan()); + totalRows = 0; + while (resultScanner.next() != null) { + totalRows++; + } + assertEquals(5500, totalRows); + } + /** * Insert to 2 different tables * @throws Exception @@ -221,7 +252,11 @@ public class TestReplicationSink { Scan scan = new Scan(); ResultScanner scanRes = table2.getScanner(scan); for(Result res : scanRes) { - assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); + assertEquals(0, Bytes.toInt(res.getRow()) % 2); + } + scanRes = table1.getScanner(scan); + for(Result res : scanRes) { + assertEquals(1, Bytes.toInt(res.getRow()) % 2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 31e94d6..15ff54c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -128,7 +128,7 @@ public class TestWALEntrySinkFilter { IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass("hbase.client.connection.impl", DevNullConnection.class, Connection.class); - ReplicationSink sink = new ReplicationSink(conf, STOPPABLE); + ReplicationSink sink = new ReplicationSink(conf); // Create some dumb walentries. List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries = new ArrayList<>();