Repository: hbase Updated Branches: refs/heads/branch-2 c459282fe -> e50e6f7ce
HBASE-20084 Refactor the RSRpcServices#doBatchOp Signed-off-by: tedyu <yuzhih...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e50e6f7c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e50e6f7c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e50e6f7c Branch: refs/heads/branch-2 Commit: e50e6f7ce92733472a757acee2c22d7fed329aef Parents: c459282 Author: Chia-Ping Tsai <chia7...@gmail.com> Authored: Mon Feb 26 20:49:05 2018 +0800 Committer: Chia-Ping Tsai <chia7...@gmail.com> Committed: Wed Feb 28 15:13:09 2018 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/RSRpcServices.java | 115 ++++++++++--------- 1 file changed, 58 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e50e6f7c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 42284e9..d0a1315 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -122,6 +122,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -762,7 +763,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do // one at a time, we instead pass them in batch. Be aware that the corresponding // ResultOrException instance that matches each Put or Delete is then added down in the - // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched + // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are + // deferred/batched List<ClientProtos.Action> mutations = null; long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); IOException sizeIOE = null; @@ -801,7 +803,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // use it for the response. // // This will create a copy in the builder. - hasResultOrException = true; NameBytesPair pair = ResponseConverter.buildException(sizeIOE); resultOrExceptionBuilder.setException(pair); context.incrementResponseExceptionSize(pair.getSerializedSize()); @@ -828,29 +829,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } else if (action.hasServiceCall()) { hasResultOrException = true; - try { - com.google.protobuf.Message result = - execServiceOnRegion(region, action.getServiceCall()); - ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = - ClientProtos.CoprocessorServiceResult.newBuilder(); - resultOrExceptionBuilder.setServiceResult( - serviceResultBuilder.setValue( - serviceResultBuilder.getValueBuilder() - .setName(result.getClass().getName()) - // TODO: Copy!!! - .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); - } catch (IOException ioe) { - rpcServer.getMetrics().exception(ioe); - NameBytesPair pair = ResponseConverter.buildException(ioe); - resultOrExceptionBuilder.setException(pair); - context.incrementResponseExceptionSize(pair.getSerializedSize()); - } + com.google.protobuf.Message result = + execServiceOnRegion(region, action.getServiceCall()); + ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = + ClientProtos.CoprocessorServiceResult.newBuilder(); + resultOrExceptionBuilder.setServiceResult( + serviceResultBuilder.setValue( + serviceResultBuilder.getValueBuilder() + .setName(result.getClass().getName()) + // TODO: Copy!!! + .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); } else if (action.hasMutation()) { MutationType type = action.getMutation().getMutateType(); if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); + doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, + spaceQuotaEnforcement); mutations.clear(); } switch (type) { @@ -895,7 +890,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Could get to here and there was no result and no exception. Presumes we added // a Put or Delete to the collecting Mutations List for adding later. In this // case the corresponding ResultOrException instance for the Put or Delete will be added - // down in the doBatchOp method call rather than up here. + // down in the doNonAtomicBatchOp method call rather than up here. } catch (IOException ie) { rpcServer.getMetrics().exception(ie); hasResultOrException = true; @@ -910,18 +905,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } // Finish up any outstanding mutations - if (mutations != null && !mutations.isEmpty()) { - try { - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); - } catch (IOException ioe) { - // TODO do the refactor to avoid this catch as it is useless - // doBatchOp has handled the IOE for all non-atomic operations. - rpcServer.getMetrics().exception(ioe); - NameBytesPair pair = ResponseConverter.buildException(ioe); - resultOrExceptionBuilder.setException(pair); - context.incrementResponseExceptionSize(pair.getSerializedSize()); - builder.addResultOrException(resultOrExceptionBuilder.build()); - } + if (!CollectionUtils.isEmpty(mutations)) { + doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); } return cellsToReturn; } @@ -942,6 +927,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, + final OperationQuota quota, final List<ClientProtos.Action> mutations, + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) + throws IOException { + // Just throw the exception. The exception will be caught and then added to region-level + // exception for RegionAction. Leaving the null to action result is ok since the null + // result is viewed as failure by hbase client. And the region-lever exception will be used + // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and + // AsyncBatchRpcRetryingCaller#onComplete for more details. + doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); + } + + private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, + final OperationQuota quota, final List<ClientProtos.Action> mutations, + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { + try { + doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); + } catch (IOException e) { + // Set the exception for each action. The mutations in same RegionAction are group to + // different batch and then be processed individually. Hence, we don't set the region-level + // exception here for whole RegionAction. + for (Action mutation : mutations) { + builder.addResultOrException(getResultOrException(e, mutation.getIndex())); + } + } + } + /** * Execute a list of Put/Delete mutations. * @@ -1028,30 +1040,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; } } - } catch (IOException ie) { + } finally { int processedMutationIndex = 0; for (Action mutation : mutations) { // The non-null mArray[i] means the cell scanner has been read. if (mArray[processedMutationIndex++] == null) { skipCellsForMutation(mutation, cells); } - if (!atomic) { - builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); - } - } - if (atomic) { - throw ie; } + updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); } + } + + private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, + boolean batchContainsDelete) { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { - regionServer.metricsRegionServer.updatePutBatch( - region.getTableDescriptor().getTableName(), after - before); + regionServer.metricsRegionServer + .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); } if (batchContainsDelete) { - regionServer.metricsRegionServer.updateDeleteBatch( - region.getTableDescriptor().getTableName(), after - before); + regionServer.metricsRegionServer + .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); } } } @@ -1120,17 +1131,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return region.batchReplay(mutations.toArray( new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); } finally { - if (regionServer.metricsRegionServer != null) { - long after = EnvironmentEdgeManager.currentTime(); - if (batchContainsPuts) { - regionServer.metricsRegionServer.updatePutBatch( - region.getTableDescriptor().getTableName(), after - before); - } - if (batchContainsDelete) { - regionServer.metricsRegionServer.updateDeleteBatch( - region.getTableDescriptor().getTableName(), after - before); - } - } + updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); } } @@ -2613,8 +2614,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), - cellScanner, spaceQuotaEnforcement, true); + doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement); processed = Boolean.TRUE; } } catch (IOException e) {