This is an automated email from the ASF dual-hosted git repository. brfrn169 pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 1f5acc1 HBASE-24515 batch Increment/Append fails when retrying the RPC 1f5acc1 is described below commit 1f5acc14f853d611056b08baf9ee6253a31a2770 Author: Toshihiro Suzuki <brfrn...@gmail.com> AuthorDate: Mon Jun 8 09:51:21 2020 +0900 HBASE-24515 batch Increment/Append fails when retrying the RPC Signed-off-by: Viraj Jasani <virajjasani...@gmail.com> --- .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 54 ---------------------- .../hadoop/hbase/regionserver/RSRpcServices.java | 31 +++++++++++-- .../hadoop/hbase/client/TestFromClientSide.java | 41 ++++++++++++++++ .../hbase/client/TestIncrementsFromClientSide.java | 41 ++++++++++++++++ 4 files changed, 109 insertions(+), 58 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index d230117..7b328ee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -909,60 +909,6 @@ public final class ProtobufUtil { throw new IOException("Unknown mutation type " + type); } - /** - * Convert a protocol buffer Mutate to a Get. - * @param proto the protocol buffer Mutate to convert. - * @param cellScanner - * @return the converted client get. - * @throws IOException - */ - public static Get toGet(final MutationProto proto, final CellScanner cellScanner) - throws IOException { - MutationType type = proto.getMutateType(); - assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name(); - byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null; - Get get = null; - int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0; - if (cellCount > 0) { - // The proto has metadata only and the data is separate to be found in the cellScanner. - if (cellScanner == null) { - throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " - + TextFormat.shortDebugString(proto)); - } - for (int i = 0; i < cellCount; i++) { - if (!cellScanner.advance()) { - throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i - + " no cell returned: " + TextFormat.shortDebugString(proto)); - } - Cell cell = cellScanner.current(); - if (get == null) { - get = new Get(CellUtil.cloneRow(cell)); - } - get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)); - } - } else { - get = new Get(row); - for (ColumnValue column : proto.getColumnValueList()) { - byte[] family = column.getFamily().toByteArray(); - for (QualifierValue qv : column.getQualifierValueList()) { - byte[] qualifier = qv.getQualifier().toByteArray(); - if (!qv.hasValue()) { - throw new DoNotRetryIOException("Missing required field: qualifier value"); - } - get.addColumn(family, qualifier); - } - } - } - if (proto.hasTimeRange()) { - TimeRange timeRange = toTimeRange(proto.getTimeRange()); - get.setTimeRange(timeRange.getMin(), timeRange.getMax()); - } - for (NameBytesPair attribute : proto.getAttributeList()) { - get.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); - } - return get; - } - public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) { switch (readType) { case DEFAULT: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2c2fb2e..05b5959 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -682,8 +682,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, r = region.append(append, nonceGroup, nonce); } else { // convert duplicate append to get - List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, - nonceGroup, nonce); + List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce); r = Result.create(results); } success = true; @@ -734,8 +733,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, r = region.increment(increment, nonceGroup, nonce); } else { // convert duplicate increment to get - List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, - nonce); + List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce); r = Result.create(results); } success = true; @@ -756,6 +754,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return r == null ? Result.EMPTY_RESULT : r; } + private static Get toGet(final Mutation mutation) throws IOException { + if(!(mutation instanceof Increment) && !(mutation instanceof Append)) { + throw new AssertionError("mutation must be a instance of Increment or Append"); + } + Get get = new Get(mutation.getRow()); + CellScanner cellScanner = mutation.cellScanner(); + while (!cellScanner.advance()) { + Cell cell = cellScanner.current(); + get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)); + } + if (mutation instanceof Increment) { + // Increment + Increment increment = (Increment) mutation; + get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax()); + } else { + // Append + Append append = (Append) mutation; + get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax()); + } + for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) { + get.setAttribute(entry.getKey(), entry.getValue()); + } + return get; + } + /** * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 6081509..9780410 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -209,6 +210,46 @@ public class TestFromClientSide { } /** + * Test batch append result when there are duplicate rpc request. + */ + @Test + public void testDuplicateBatchAppend() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getMethodName()); + Map<String, String> kvs = new HashMap<>(); + kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000"); + hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1, + kvs); + TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close(); + + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50); + // Client will retry because rpc timeout is small than the sleep time of first rpc call + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500); + + try (Connection connection = ConnectionFactory.createConnection(c); + Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null). + setOperationTimeout(3 * 1000).build()) { + Append append = new Append(ROW); + append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE); + + // Batch append + Object[] results = new Object[1]; + table.batch(Collections.singletonList(append), results); + + // Verify expected result + Cell[] cells = ((Result) results[0]).rawCells(); + assertEquals(1, cells.length); + assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE); + + // Verify expected result again + Result readResult = table.get(new Get(ROW)); + cells = readResult.rawCells(); + assertEquals(1, cells.length); + assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE); + } + } + + /** * Basic client side validation of HBASE-4536 */ @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java index b1aba6a..fae6051 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -139,6 +140,46 @@ public class TestIncrementsFromClientSide { } } + /** + * Test batch increment result when there are duplicate rpc request. + */ + @Test + public void testDuplicateBatchIncrement() throws Exception { + HTableDescriptor hdt = + TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName())); + Map<String, String> kvs = new HashMap<>(); + kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000"); + hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1, + kvs); + TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close(); + + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50); + // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500); + + try (Connection connection = ConnectionFactory.createConnection(c); + Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null) + .setOperationTimeout(3 * 1000).build()) { + Increment inc = new Increment(ROW); + inc.addColumn(HBaseTestingUtility.fam1, QUALIFIER, 1); + + // Batch increment + Object[] results = new Object[1]; + table.batch(Collections.singletonList(inc), results); + + Cell[] cells = ((Result) results[0]).rawCells(); + assertEquals(1, cells.length); + assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1); + + // Verify expected result + Result readResult = table.get(new Get(ROW)); + cells = readResult.rawCells(); + assertEquals(1, cells.length); + assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1); + } + } + @Test public void testIncrementWithDeletes() throws Exception { LOG.info("Starting " + this.name.getMethodName());