HBASE-15214 Valid mutate Ops fail with RPC Codec in use and region moves across.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7239056c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7239056c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7239056c Branch: refs/heads/hbase-12439 Commit: 7239056c78cc6eb2867c8865ab45821d3e51328a Parents: 4265bf2 Author: anoopsjohn <anoopsamj...@gmail.com> Authored: Sat Feb 6 02:40:49 2016 +0530 Committer: anoopsjohn <anoopsamj...@gmail.com> Committed: Sat Feb 6 02:40:49 2016 +0530 ---------------------------------------------------------------------- .../hadoop/hbase/protobuf/ProtobufUtil.java | 18 ++++------- .../hbase/regionserver/RSRpcServices.java | 34 ++++++++++++++++++++ .../hadoop/hbase/client/TestMultiParallel.java | 4 +++ 3 files changed, 45 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7239056c/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index fe76780..e9a1223 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -543,7 +543,7 @@ public final class ProtobufUtil { MutationType type = proto.getMutateType(); assert type == MutationType.PUT: type.name(); long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP; - Put put = null; + Put put = proto.hasRow() ? new Put(proto.getRow().toByteArray(), timestamp) : null; int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; if (cellCount > 0) { // The proto has metadata only and the data is separate to be found in the cellScanner. @@ -563,9 +563,7 @@ public final class ProtobufUtil { put.add(cell); } } else { - if (proto.hasRow()) { - put = new Put(proto.getRow().asReadOnlyByteBuffer(), timestamp); - } else { + if (put == null) { throw new IllegalArgumentException("row cannot be null"); } // The proto has the metadata and the data itself @@ -639,12 +637,8 @@ public final class ProtobufUtil { throws IOException { MutationType type = proto.getMutateType(); assert type == MutationType.DELETE : type.name(); - byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null; - long timestamp = HConstants.LATEST_TIMESTAMP; - if (proto.hasTimestamp()) { - timestamp = proto.getTimestamp(); - } - Delete delete = null; + long timestamp = proto.hasTimestamp() ? proto.getTimestamp() : HConstants.LATEST_TIMESTAMP; + Delete delete = proto.hasRow() ? new Delete(proto.getRow().toByteArray(), timestamp) : null; int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0; if (cellCount > 0) { // The proto has metadata only and the data is separate to be found in the cellScanner. @@ -667,7 +661,9 @@ public final class ProtobufUtil { delete.addDeleteMarker(cell); } } else { - delete = new Delete(row, timestamp); + if (delete == null) { + throw new IllegalArgumentException("row cannot be null"); + } for (ColumnValue column: proto.getColumnValueList()) { byte[] family = column.getFamily().toByteArray(); for (QualifierValue qv: column.getQualifierValueList()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/7239056c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3e133c4..e346c34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; @@ -696,6 +697,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, setException(ResponseConverter.buildException(sizeIOE)); resultOrExceptionBuilder.setIndex(action.getIndex()); builder.addResultOrException(resultOrExceptionBuilder.build()); + if (cellScanner != null) { + skipCellsForMutation(action, cellScanner); + } continue; } if (action.hasGet()) { @@ -2239,6 +2243,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcServer.getMetrics().exception(e); regionActionResultBuilder.setException(ResponseConverter.buildException(e)); responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); + // All Mutations in this RegionAction not executed as we can not see the Region online here + // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner + // corresponding to these Mutations. + if (cellScanner != null) { + skipCellsForMutations(regionAction.getActionList(), cellScanner); + } continue; // For this region it's a failure. } @@ -2296,6 +2306,30 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return responseBuilder.build(); } + private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { + for (Action action : actions) { + skipCellsForMutation(action, cellScanner); + } + } + + private void skipCellsForMutation(Action action, CellScanner cellScanner) { + try { + if (action.hasMutation()) { + MutationProto m = action.getMutation(); + if (m.hasAssociatedCellCount()) { + for (int i = 0; i < m.getAssociatedCellCount(); i++) { + cellScanner.advance(); + } + } + } + } catch (IOException e) { + // No need to handle these Individual Muatation level issue. Any way this entire RegionAction + // marked as failed as we could not see the Region here. At client side the top level + // RegionAction exception will be considered first. + LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); + } + } + /** * Mutate data in a table. * http://git-wip-us.apache.org/repos/asf/hbase/blob/7239056c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index af3a54e..d295ab2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -35,10 +35,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -75,6 +77,8 @@ public class TestMultiParallel { //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, + KeyValueCodec.class.getCanonicalName()); UTIL.startMiniCluster(slaves); Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); UTIL.waitTableEnabled(TEST_TABLE);