HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable Signed-off-by: zhangduo <zhang...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cb55c40 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cb55c40 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cb55c40 Branch: refs/heads/hbase-12439 Commit: 8cb55c4080206a651023f6d042fac295192f1c2b Parents: 992e571 Author: ChiaPing Tsai <chia7...@gmail.com> Authored: Sat Dec 24 12:02:05 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sat Dec 24 12:02:05 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 704 +++-------------- .../hadoop/hbase/client/AsyncProcessTask.java | 229 ++++++ .../hbase/client/AsyncRequestFutureImpl.java | 46 +- .../hbase/client/BufferedMutatorImpl.java | 165 ++-- .../hbase/client/BufferedMutatorParams.java | 21 +- .../hbase/client/ConnectionConfiguration.java | 19 +- .../hbase/client/ConnectionImplementation.java | 13 +- .../org/apache/hadoop/hbase/client/HTable.java | 295 ++++--- .../hadoop/hbase/client/HTableMultiplexer.java | 15 +- .../hadoop/hbase/client/RequestController.java | 125 +++ .../hbase/client/RequestControllerFactory.java | 44 ++ .../apache/hadoop/hbase/client/RowAccess.java | 3 +- .../hbase/client/SimpleRequestController.java | 519 +++++++++++++ .../hadoop/hbase/client/TestAsyncProcess.java | 769 ++++++++----------- .../client/TestSimpleRequestController.java | 336 ++++++++ .../hbase/client/HConnectionTestingUtility.java | 7 +- .../hadoop/hbase/client/TestClientPushback.java | 19 +- .../hadoop/hbase/client/TestReplicasClient.java | 14 +- .../regionserver/TestPerColumnFamilyFlush.java | 1 - .../security/access/TestTablePermissions.java | 72 +- 20 files changed, 2128 insertions(+), 1288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 50a2a11..d1583f5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,45 +19,35 @@ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; -import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; +import org.apache.hadoop.hbase.client.RequestController.ReturnCode; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This class allows a continuous flow of requests. It's written to be compatible with a @@ -95,9 +85,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * </p> */ @InterfaceAudience.Private +@InterfaceStability.Evolving class AsyncProcess { private static final Log LOG = LogFactory.getLog(AsyncProcess.class); - protected static final AtomicLong COUNTER = new AtomicLong(); + private static final AtomicLong COUNTER = new AtomicLong(); public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; @@ -116,31 +107,6 @@ class AsyncProcess { */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; - protected final int thresholdToLogUndoneTaskDetails; - private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = - "hbase.client.threshold.log.details"; - private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; - private static final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; - - /** - * The maximum size of single RegionServer. - */ - public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize"; - - /** - * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE - */ - public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; - - /** - * The maximum size of submit. - */ - public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; - /** - * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE - */ - public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; - /** * Return value from a submit that didn't contain any requests. */ @@ -173,64 +139,42 @@ class AsyncProcess { }; // TODO: many of the fields should be made private - protected final long id; - - protected final ClusterConnection connection; - protected final RpcRetryingCallerFactory rpcCallerFactory; - protected final RpcControllerFactory rpcFactory; - protected final BatchErrors globalErrors; - protected final ExecutorService pool; - - protected final AtomicLong tasksInProgress = new AtomicLong(0); - protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion = - new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR); - protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = - new ConcurrentHashMap<ServerName, AtomicInteger>(); - // Start configuration settings. - protected final int startLogErrorsCnt; + final long id; - /** - * The number of tasks simultaneously executed on the cluster. - */ - protected final int maxTotalConcurrentTasks; + final ClusterConnection connection; + private final RpcRetryingCallerFactory rpcCallerFactory; + final RpcControllerFactory rpcFactory; + final BatchErrors globalErrors; - /** - * The max heap size of all tasks simultaneously executed on a server. - */ - protected final long maxHeapSizePerRequest; - protected final long maxHeapSizeSubmit; - /** - * The number of tasks we run in parallel on a single region. - * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start - * a set of operations on a region before the previous one is done. As well, this limits - * the pressure we put on the region server. - */ - protected final int maxConcurrentTasksPerRegion; + // Start configuration settings. + final int startLogErrorsCnt; - /** - * The number of task simultaneously executed on a single region server. - */ - protected final int maxConcurrentTasksPerServer; - protected final long pause; - protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified - protected int numTries; - protected int serverTrackerTimeout; - protected int rpcTimeout; - protected int operationTimeout; - protected long primaryCallTimeoutMicroseconds; + final long pause; + final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified + final int numTries; + @VisibleForTesting + int serverTrackerTimeout; + final long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ - protected final boolean logBatchErrorDetails; + final boolean logBatchErrorDetails; // End configuration settings. - public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, + /** + * The traffic control for requests. + */ + @VisibleForTesting + final RequestController requestController; + public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms"; + private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; + private final int periodToLog; + AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) { + RpcControllerFactory rpcFactory) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } this.connection = hc; - this.pool = pool; this.globalErrors = useGlobalErrors ? new BatchErrors() : null; this.id = COUNTER.incrementAndGet(); @@ -249,42 +193,10 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.rpcTimeout = rpcTimeout; - this.operationTimeout = operationTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); - - this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); - this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); - this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); - this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, - DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); - this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); - - if (this.maxTotalConcurrentTasks <= 0) { - throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); - } - if (this.maxConcurrentTasksPerServer <= 0) { - throw new IllegalArgumentException("maxConcurrentTasksPerServer=" + - maxConcurrentTasksPerServer); - } - if (this.maxConcurrentTasksPerRegion <= 0) { - throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + - maxConcurrentTasksPerRegion); - } - if (this.maxHeapSizePerRequest <= 0) { - throw new IllegalArgumentException("maxHeapSizePerServer=" + - maxHeapSizePerRequest); - } - - if (this.maxHeapSizeSubmit <= 0) { - throw new IllegalArgumentException("maxHeapSizeSubmit=" + - maxHeapSizeSubmit); - } + this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD); // Server tracker allows us to do faster, and yet useful (hopefully), retries. // However, if we are too useful, we might fail very quickly due to retry count limit. // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum @@ -301,43 +213,30 @@ class AsyncProcess { this.rpcFactory = rpcFactory; this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); - this.thresholdToLogUndoneTaskDetails = - conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, - DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); + this.requestController = RequestControllerFactory.create(conf); } /** - * @return pool if non null, otherwise returns this.pool if non null, otherwise throws - * RuntimeException + * The submitted task may be not accomplished at all if there are too many running tasks or + * other limits. + * @param <CResult> The class to cast the result + * @param task The setting and data + * @return AsyncRequestFuture */ - protected ExecutorService getPool(ExecutorService pool) { - if (pool != null) { - return pool; - } - if (this.pool != null) { - return this.pool; + public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException { + AsyncRequestFuture reqFuture = checkTask(task); + if (reqFuture != null) { + return reqFuture; + } + SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows(); + switch (submittedRows) { + case ALL: + return submitAll(task); + case AT_LEAST_ONE: + return submit(task, true); + default: + return submit(task, false); } - throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService"); - } - - /** - * See #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean). - * Uses default ExecutorService for this AP (must have been created with one). - */ - public <CResult> AsyncRequestFuture submit(TableName tableName, - final RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, - boolean needResults) throws InterruptedIOException { - return submit(null, tableName, rows, atLeastOne, callback, needResults); - } - /** - * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}. - * Uses the {@link ListRowAccess} to wrap the {@link List}. - */ - public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, - boolean needResults) throws InterruptedIOException { - return submit(pool, tableName, new ListRowAccess(rows), atLeastOne, - callback, needResults); } /** @@ -345,20 +244,13 @@ class AsyncProcess { * list. Does not send requests to replicas (not currently used for anything other * than streaming puts anyway). * - * @param pool ExecutorService to use. - * @param tableName The table for which this request is needed. - * @param callback Batch callback. Only called on success (94 behavior). - * @param needResults Whether results are needed, or can be discarded. - * @param rows - the submitted row. Modified by the method: we remove the rows we took. + * @param task The setting and data * @param atLeastOne true if we should submit at least a subset. */ - public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, - RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, - boolean needResults) throws InterruptedIOException { - if (rows.isEmpty()) { - return NO_REQS_RESULT; - } - + private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task, + boolean atLeastOne) throws InterruptedIOException { + TableName tableName = task.getTableName(); + RowAccess<? extends Row> rows = task.getRowAccess(); Map<ServerName, MultiAction> actionsByServer = new HashMap<ServerName, MultiAction>(); List<Action> retainedActions = new ArrayList<Action>(rows.size()); @@ -369,11 +261,11 @@ class AsyncProcess { // Location errors that happen before we decide what requests to take. List<Exception> locationErrors = null; List<Integer> locationErrorRows = null; - RowCheckerHost checker = createRowCheckerHost(); + RequestController.Checker checker = requestController.newChecker(); boolean firstIter = true; do { // Wait until there is at least one slot for a new task. - waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); + requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1)); int posInList = -1; if (!firstIter) { checker.reset(); @@ -406,8 +298,7 @@ class AsyncProcess { it.remove(); break; // Backward compat: we stop considering actions on location error. } - long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0; - ReturnCode code = checker.canTakeOperation(loc, rowSize); + ReturnCode code = checker.canTakeRow(loc, r); if (code == ReturnCode.END) { break; } @@ -426,29 +317,14 @@ class AsyncProcess { if (retainedActions.isEmpty()) return NO_REQS_RESULT; - return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults, - locationErrors, locationErrorRows, actionsByServer, pool); + return submitMultiActions(task, retainedActions, nonceGroup, + locationErrors, locationErrorRows, actionsByServer); } - private RowCheckerHost createRowCheckerHost() { - return new RowCheckerHost(Arrays.asList( - new TaskCountChecker(maxTotalConcurrentTasks, - maxConcurrentTasksPerServer, - maxConcurrentTasksPerRegion, - tasksInProgress, - taskCounterPerServer, - taskCounterPerRegion) - , new RequestSizeChecker(maxHeapSizePerRequest) - , new SubmittedSizeChecker(maxHeapSizeSubmit) - )); - } - <CResult> AsyncRequestFuture submitMultiActions(TableName tableName, - List<Action> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, - Object[] results, boolean needResults, List<Exception> locationErrors, - List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer, - ExecutorService pool) { - AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1); + <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task, + List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors, + List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) { + AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -462,14 +338,6 @@ class AsyncProcess { return ars; } - public void setRpcTimeout(int rpcTimeout) { - this.rpcTimeout = rpcTimeout; - } - - public void setOperationTimeout(int operationTimeout) { - this.operationTimeout = operationTimeout; - } - /** * Helper that is used when grouping the actions per region server. * @@ -493,24 +361,13 @@ class AsyncProcess { multiAction.add(regionName, action); } - public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, -1); - } /** * Submit immediately the list of rows, whatever the server status. Kept for backward * compatibility: it allows to be used with the batch interface that return an array of objects. - * - * @param pool ExecutorService to use. - * @param tableName name of the table for which the submission is made. - * @param rows the list of rows. - * @param callback the callback. - * @param results Optional array to return the results thru; backward compat. - * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting. + * @param task The setting and data */ - public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, - CancellableRegionServerCallable callable, int rpcTimeout) { + private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) { + RowAccess<? extends Row> rows = task.getRowAccess(); List<Action> actions = new ArrayList<Action>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -528,93 +385,78 @@ class AsyncProcess { setNonce(ng, r, action); actions.add(action); } - AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( - tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, rpcTimeout); + AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup()); ars.groupAndSendMultiAction(actions, 1); return ars; } + private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) { + if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) { + return NO_REQS_RESULT; + } + Objects.requireNonNull(task.getPool(), "The pool can't be NULL"); + checkOperationTimeout(task.getOperationTimeout()); + checkRpcTimeout(task.getRpcTimeout()); + return null; + } + private void setNonce(NonceGenerator ng, Row r, Action action) { if (!(r instanceof Append) && !(r instanceof Increment)) return; action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. } - protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( - TableName tableName, List<Action> actions, long nonceGroup, ExecutorService pool, - Batch.Callback<CResult> callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int rpcTimeout) { - return new AsyncRequestFutureImpl<CResult>( - tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, operationTimeout, - rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this); + private int checkTimeout(String name, int timeout) { + if (timeout < 0) { + throw new RuntimeException("The " + name + " must be bigger than zero," + + "current value is" + timeout); + } + return timeout; + } + private int checkOperationTimeout(int operationTimeout) { + return checkTimeout("operation timeout", operationTimeout); + } + + private int checkRpcTimeout(int rpcTimeout) { + return checkTimeout("rpc timeout", rpcTimeout); + } + + @VisibleForTesting + <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( + AsyncProcessTask task, List<Action> actions, long nonceGroup) { + return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this); } /** Wait until the async does not have more than max tasks in progress. */ - protected void waitForMaximumCurrentTasks(int max, String tableName) + protected void waitForMaximumCurrentTasks(int max, TableName tableName) throws InterruptedIOException { - waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName); + requestController.waitForMaximumCurrentTasks(max, id, periodToLog, + getLogger(tableName, max)); } - // Break out this method so testable - @VisibleForTesting - void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id, - String tableName) throws InterruptedIOException { - long lastLog = EnvironmentEdgeManager.currentTime(); - long currentInProgress, oldInProgress = Long.MAX_VALUE; - while ((currentInProgress = tasksInProgress.get()) > max) { - if (oldInProgress != currentInProgress) { // Wait for in progress to change. - long now = EnvironmentEdgeManager.currentTime(); - if (now > lastLog + 10000) { - lastLog = now; - LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" - + max + ", tasksInProgress=" + currentInProgress + - " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName); - if (currentInProgress <= thresholdToLogUndoneTaskDetails) { - logDetailsOfUndoneTasks(currentInProgress); - } - } - } - oldInProgress = currentInProgress; - try { - synchronized (tasksInProgress) { - if (tasksInProgress.get() == oldInProgress) { - tasksInProgress.wait(10); - } - } - } catch (InterruptedException e) { - throw new InterruptedIOException("#" + id + ", interrupted." + - " currentNumberOfTask=" + currentInProgress); - } - } + private Consumer<Long> getLogger(TableName tableName, long max) { + return (currentInProgress) -> { + LOG.info("#" + id + (max < 0 ? ", waiting for any free slot" + : ", waiting for some tasks to finish. Expected max=" + + max) + ", tasksInProgress=" + currentInProgress + + " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName); + }; } - void logDetailsOfUndoneTasks(long taskInProgress) { - ArrayList<ServerName> servers = new ArrayList<ServerName>(); - for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) { - if (entry.getValue().get() > 0) { - servers.add(entry.getKey()); - } - } - LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); - if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) { - ArrayList<String> regions = new ArrayList<String>(); - for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) { - if (entry.getValue().get() > 0) { - regions.add(Bytes.toString(entry.getKey())); - } - } - LOG.info("Regions against which left over task(s) are processed: " + regions); - } + void incTaskCounters(Collection<byte[]> regions, ServerName sn) { + requestController.incTaskCounters(regions, sn); } + + void decTaskCounters(Collection<byte[]> regions, ServerName sn) { + requestController.decTaskCounters(regions, sn); + } /** * Only used w/useGlobalErrors ctor argument, for HTable backward compat. * @return Whether there were any errors in any request since the last time * {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created. */ public boolean hasError() { - return globalErrors.hasErrors(); + return globalErrors != null && globalErrors.hasErrors(); } /** @@ -628,9 +470,9 @@ class AsyncProcess { * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List<Row> failedRows, String tableName) throws InterruptedIOException { + List<Row> failedRows, TableName tableName) throws InterruptedIOException { waitForMaximumCurrentTasks(0, tableName); - if (!globalErrors.hasErrors()) { + if (globalErrors == null || !globalErrors.hasErrors()) { return null; } if (failedRows != null) { @@ -642,41 +484,12 @@ class AsyncProcess { } /** - * increment the tasks counters for a given set of regions. MT safe. - */ - protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) { - tasksInProgress.incrementAndGet(); - - computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); - - for (byte[] regBytes : regions) { - computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet(); - } - } - - /** - * Decrements the counters for a given region and the region server. MT Safe. - */ - protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) { - for (byte[] regBytes : regions) { - AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); - regionCnt.decrementAndGet(); - } - - taskCounterPerServer.get(sn).decrementAndGet(); - tasksInProgress.decrementAndGet(); - synchronized (tasksInProgress) { - tasksInProgress.notifyAll(); - } - } - - /** * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting protected RpcRetryingCaller<AbstractResponse> createCaller( CancellableRegionServerCallable callable, int rpcTimeout) { - return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout); + return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout)); } @@ -687,7 +500,7 @@ class AsyncProcess { * We may benefit from connection-wide tracking of server errors. * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection */ - protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { + ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { return new ConnectionImplementation.ServerErrorTracker( this.serverTrackerTimeout, this.numTries); } @@ -696,283 +509,4 @@ class AsyncProcess { return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); } - /** - * Collect all advices from checkers and make the final decision. - */ - @VisibleForTesting - static class RowCheckerHost { - private final List<RowChecker> checkers; - private boolean isEnd = false; - RowCheckerHost(final List<RowChecker> checkers) { - this.checkers = checkers; - } - void reset() throws InterruptedIOException { - isEnd = false; - InterruptedIOException e = null; - for (RowChecker checker : checkers) { - try { - checker.reset(); - } catch (InterruptedIOException ex) { - e = ex; - } - } - if (e != null) { - throw e; - } - } - ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { - if (isEnd) { - return ReturnCode.END; - } - ReturnCode code = ReturnCode.INCLUDE; - for (RowChecker checker : checkers) { - switch (checker.canTakeOperation(loc, rowSize)) { - case END: - isEnd = true; - code = ReturnCode.END; - break; - case SKIP: - code = ReturnCode.SKIP; - break; - case INCLUDE: - default: - break; - } - if (code == ReturnCode.END) { - break; - } - } - for (RowChecker checker : checkers) { - checker.notifyFinal(code, loc, rowSize); - } - return code; - } - } - - /** - * Provide a way to control the flow of rows iteration. - */ - // Visible for Testing. Adding @VisibleForTesting here doesn't work for some reason. - interface RowChecker { - enum ReturnCode { - /** - * Accept current row. - */ - INCLUDE, - /** - * Skip current row. - */ - SKIP, - /** - * No more row can be included. - */ - END - }; - ReturnCode canTakeOperation(HRegionLocation loc, long rowSize); - /** - * Add the final ReturnCode to the checker. - * The ReturnCode may be reversed, so the checker need the final decision to update - * the inner state. - */ - void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize); - /** - * Reset the inner state. - */ - void reset() throws InterruptedIOException ; - } - - /** - * limit the heapsize of total submitted data. - * Reduce the limit of heapsize for submitting quickly - * if there is no running task. - */ - @VisibleForTesting - static class SubmittedSizeChecker implements RowChecker { - private final long maxHeapSizeSubmit; - private long heapSize = 0; - SubmittedSizeChecker(final long maxHeapSizeSubmit) { - this.maxHeapSizeSubmit = maxHeapSizeSubmit; - } - @Override - public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { - if (heapSize >= maxHeapSizeSubmit) { - return ReturnCode.END; - } - return ReturnCode.INCLUDE; - } - - @Override - public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { - if (code == ReturnCode.INCLUDE) { - heapSize += rowSize; - } - } - - @Override - public void reset() { - heapSize = 0; - } - } - /** - * limit the max number of tasks in an AsyncProcess. - */ - @VisibleForTesting - static class TaskCountChecker implements RowChecker { - private static final long MAX_WAITING_TIME = 1000; //ms - private final Set<HRegionInfo> regionsIncluded = new HashSet<>(); - private final Set<ServerName> serversIncluded = new HashSet<>(); - private final int maxConcurrentTasksPerRegion; - private final int maxTotalConcurrentTasks; - private final int maxConcurrentTasksPerServer; - private final Map<byte[], AtomicInteger> taskCounterPerRegion; - private final Map<ServerName, AtomicInteger> taskCounterPerServer; - private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); - private final AtomicLong tasksInProgress; - TaskCountChecker(final int maxTotalConcurrentTasks, - final int maxConcurrentTasksPerServer, - final int maxConcurrentTasksPerRegion, - final AtomicLong tasksInProgress, - final Map<ServerName, AtomicInteger> taskCounterPerServer, - final Map<byte[], AtomicInteger> taskCounterPerRegion) { - this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; - this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; - this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; - this.taskCounterPerRegion = taskCounterPerRegion; - this.taskCounterPerServer = taskCounterPerServer; - this.tasksInProgress = tasksInProgress; - } - @Override - public void reset() throws InterruptedIOException { - // prevent the busy-waiting - waitForRegion(); - regionsIncluded.clear(); - serversIncluded.clear(); - busyRegions.clear(); - } - private void waitForRegion() throws InterruptedIOException { - if (busyRegions.isEmpty()) { - return; - } - EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); - final long start = ee.currentTime(); - while ((ee.currentTime() - start) <= MAX_WAITING_TIME) { - for (byte[] region : busyRegions) { - AtomicInteger count = taskCounterPerRegion.get(region); - if (count == null || count.get() < maxConcurrentTasksPerRegion) { - return; - } - } - try { - synchronized (tasksInProgress) { - tasksInProgress.wait(10); - } - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted." + - " tasksInProgress=" + tasksInProgress); - } - } - } - /** - * 1) check the regions is allowed. - * 2) check the concurrent tasks for regions. - * 3) check the total concurrent tasks. - * 4) check the concurrent tasks for server. - * @param loc - * @param rowSize - * @return - */ - @Override - public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { - - HRegionInfo regionInfo = loc.getRegionInfo(); - if (regionsIncluded.contains(regionInfo)) { - // We already know what to do with this region. - return ReturnCode.INCLUDE; - } - AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); - if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { - // Too many tasks on this region already. - return ReturnCode.SKIP; - } - int newServers = serversIncluded.size() - + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); - if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { - // Too many tasks. - return ReturnCode.SKIP; - } - AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); - if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { - // Too many tasks for this individual server - return ReturnCode.SKIP; - } - return ReturnCode.INCLUDE; - } - - @Override - public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { - if (code == ReturnCode.INCLUDE) { - regionsIncluded.add(loc.getRegionInfo()); - serversIncluded.add(loc.getServerName()); - } - busyRegions.add(loc.getRegionInfo().getRegionName()); - } - } - - /** - * limit the request size for each regionserver. - */ - @VisibleForTesting - static class RequestSizeChecker implements RowChecker { - private final long maxHeapSizePerRequest; - private final Map<ServerName, Long> serverRequestSizes = new HashMap<>(); - RequestSizeChecker(final long maxHeapSizePerRequest) { - this.maxHeapSizePerRequest = maxHeapSizePerRequest; - } - @Override - public void reset() { - serverRequestSizes.clear(); - } - @Override - public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { - // Is it ok for limit of request size? - long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? - serverRequestSizes.get(loc.getServerName()) : 0L; - // accept at least one request - if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) { - return ReturnCode.INCLUDE; - } - return ReturnCode.SKIP; - } - - @Override - public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { - if (code == ReturnCode.INCLUDE) { - long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ? - serverRequestSizes.get(loc.getServerName()) : 0L; - serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize); - } - } - } - - public static class ListRowAccess<T> implements RowAccess<T> { - private final List<T> data; - ListRowAccess(final List<T> data) { - this.data = data; - } - - @Override - public int size() { - return data.size(); - } - - @Override - public boolean isEmpty() { - return data.isEmpty(); - } - - @Override - public Iterator<T> iterator() { - return data.iterator(); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java new file mode 100644 index 0000000..eda1db2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.coprocessor.Batch; + +/** + * Contains the attributes of a task which will be executed + * by {@link org.apache.hadoop.hbase.client.AsyncProcess}. + * The attributes will be validated by AsyncProcess. + * It's intended for advanced client applications. + * @param <T> The type of response from server-side + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class AsyncProcessTask<T> { + /** + * The number of processed rows. + * The AsyncProcess has traffic control which may reject some rows. + */ + public enum SubmittedRows { + ALL, + AT_LEAST_ONE, + NORMAL + } + public static <T> Builder<T> newBuilder(final Batch.Callback<T> callback) { + return new Builder<>(callback); + } + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder<T> { + + private ExecutorService pool; + private TableName tableName; + private RowAccess<? extends Row> rows; + private SubmittedRows submittedRows = SubmittedRows.ALL; + private Batch.Callback<T> callback; + private boolean needResults; + private int rpcTimeout; + private int operationTimeout; + private CancellableRegionServerCallable callable; + private Object[] results; + + private Builder() { + } + + private Builder(Batch.Callback<T> callback) { + this.callback = callback; + } + + Builder<T> setResults(Object[] results) { + this.results = results; + if (results != null && results.length != 0) { + setNeedResults(true); + } + return this; + } + + public Builder<T> setPool(ExecutorService pool) { + this.pool = pool; + return this; + } + + public Builder<T> setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public Builder<T> setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + return this; + } + + public Builder<T> setTableName(TableName tableName) { + this.tableName = tableName; + return this; + } + + public Builder<T> setRowAccess(List<? extends Row> rows) { + this.rows = new ListRowAccess<>(rows); + return this; + } + + public Builder<T> setRowAccess(RowAccess<? extends Row> rows) { + this.rows = rows; + return this; + } + + public Builder<T> setSubmittedRows(SubmittedRows submittedRows) { + this.submittedRows = submittedRows; + return this; + } + + public Builder<T> setNeedResults(boolean needResults) { + this.needResults = needResults; + return this; + } + + Builder<T> setCallable(CancellableRegionServerCallable callable) { + this.callable = callable; + return this; + } + + public AsyncProcessTask<T> build() { + return new AsyncProcessTask<>(pool, tableName, rows, submittedRows, + callback, callable, needResults, rpcTimeout, operationTimeout, results); + } + } + private final ExecutorService pool; + private final TableName tableName; + private final RowAccess<? extends Row> rows; + private final SubmittedRows submittedRows; + private final Batch.Callback<T> callback; + private final CancellableRegionServerCallable callable; + private final boolean needResults; + private final int rpcTimeout; + private final int operationTimeout; + private final Object[] results; + AsyncProcessTask(AsyncProcessTask<T> task) { + this(task.getPool(), task.getTableName(), task.getRowAccess(), + task.getSubmittedRows(), task.getCallback(), task.getCallable(), + task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(), + task.getResults()); + } + AsyncProcessTask(ExecutorService pool, TableName tableName, + RowAccess<? extends Row> rows, SubmittedRows size, Batch.Callback<T> callback, + CancellableRegionServerCallable callable, boolean needResults, + int rpcTimeout, int operationTimeout, Object[] results) { + this.pool = pool; + this.tableName = tableName; + this.rows = rows; + this.submittedRows = size; + this.callback = callback; + this.callable = callable; + this.needResults = needResults; + this.rpcTimeout = rpcTimeout; + this.operationTimeout = operationTimeout; + this.results = results; + } + + public int getOperationTimeout() { + return operationTimeout; + } + + public ExecutorService getPool() { + return pool; + } + + public TableName getTableName() { + return tableName; + } + + public RowAccess<? extends Row> getRowAccess() { + return rows; + } + + public SubmittedRows getSubmittedRows() { + return submittedRows; + } + + public Batch.Callback<T> getCallback() { + return callback; + } + + CancellableRegionServerCallable getCallable() { + return callable; + } + + Object[] getResults() { + return results; + } + + public boolean getNeedResults() { + return needResults; + } + + public int getRpcTimeout() { + return rpcTimeout; + } + + static class ListRowAccess<T> implements RowAccess<T> { + + private final List<T> data; + + ListRowAccess(final List<T> data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public Iterator<T> iterator() { + return data.iterator(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index d176ce1..036196e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -300,11 +300,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; - private CancellableRegionServerCallable currentCallable; - private int operationTimeout; - private int rpcTimeout; + private final CancellableRegionServerCallable currentCallable; + private final int operationTimeout; + private final int rpcTimeout; private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); - protected AsyncProcess asyncProcess; + private final AsyncProcess asyncProcess; /** * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only @@ -339,32 +339,27 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { } } - - - public AsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback, - CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout, - AsyncProcess asyncProcess) { - this.pool = pool; - this.callback = callback; + public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, + long nonceGroup, AsyncProcess asyncProcess) { + this.pool = task.getPool(); + this.callback = task.getCallback(); this.nonceGroup = nonceGroup; - this.tableName = tableName; + this.tableName = task.getTableName(); this.actionsInProgress.set(actions.size()); - if (results != null) { - assert needResults; - if (results.length != actions.size()) { + if (task.getResults() == null) { + results = task.getNeedResults() ? new Object[actions.size()] : null; + } else { + if (task.getResults().length != actions.size()) { throw new AssertionError("results.length"); } - this.results = results; + this.results = task.getResults(); for (int i = 0; i != this.results.length; ++i) { results[i] = null; } - } else { - this.results = needResults ? new Object[actions.size()] : null; } List<Integer> replicaGetIndices = null; boolean hasAnyReplicaGets = false; - if (needResults) { + if (results != null) { // Check to see if any requests might require replica calls. // We expect that many requests will consist of all or no multi-replica gets; in such // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will @@ -414,10 +409,10 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { this.errorsByServer = createServerErrorTracker(); this.errors = (asyncProcess.globalErrors != null) ? asyncProcess.globalErrors : new BatchErrors(); - this.operationTimeout = operationTimeout; - this.rpcTimeout = rpcTimeout; - this.currentCallable = callable; - if (callable == null) { + this.operationTimeout = task.getOperationTimeout(); + this.rpcTimeout = task.getRpcTimeout(); + this.currentCallable = task.getCallable(); + if (task.getCallable() == null) { tracker = new RetryingTimeTracker().start(); } } @@ -1246,9 +1241,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { lastLog = now; LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress + " actions to finish on table: " + tableName); - if (currentInProgress <= asyncProcess.thresholdToLogUndoneTaskDetails) { - asyncProcess.logDetailsOfUndoneTasks(currentInProgress); - } } } synchronized (actionsInProgress) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 0085767..2a55de9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,12 +19,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; @@ -36,6 +33,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** * <p> @@ -67,61 +66,70 @@ public class BufferedMutatorImpl implements BufferedMutator { "hbase.client.bufferedmutator.classname"; private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); - + private final ExceptionListener listener; - protected ClusterConnection connection; // non-final so can be overridden in test private final TableName tableName; - private volatile Configuration conf; - - @VisibleForTesting - final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>(); - @VisibleForTesting - AtomicLong currentWriteBufferSize = new AtomicLong(0); + private final Configuration conf; + private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>(); + private final AtomicLong currentWriteBufferSize = new AtomicLong(0); /** * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. */ - @VisibleForTesting - AtomicInteger undealtMutationCount = new AtomicInteger(0); - private long writeBufferSize; + private final AtomicInteger undealtMutationCount = new AtomicInteger(0); + private volatile long writeBufferSize; private final int maxKeyValueSize; - private boolean closed = false; private final ExecutorService pool; - private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor - private int operationTimeout; + private final AtomicInteger rpcTimeout; + private final AtomicInteger operationTimeout; + private final boolean cleanupPoolOnClose; + private volatile boolean closed = false; + private final AsyncProcess ap; @VisibleForTesting - protected AsyncProcess ap; // non-final so can be overridden in test - - BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { if (conn == null || conn.isClosed()) { throw new IllegalArgumentException("Connection is null or closed."); } - this.tableName = params.getTableName(); - this.connection = conn; - this.conf = connection.getConfiguration(); - this.pool = params.getPool(); + this.conf = conn.getConfiguration(); this.listener = params.getListener(); - + if (params.getPool() == null) { + this.pool = HTable.getDefaultExecutor(conf); + cleanupPoolOnClose = true; + } else { + this.pool = params.getPool(); + cleanupPoolOnClose = false; + } ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.operationTimeout = conn.getConfiguration().getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, - writeRpcTimeout, operationTimeout); + this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ? + params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); + this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ? + params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); + this.ap = ap; + } + BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + this(conn, params, + // puts need to track errors globally due to how the APIs currently work. + new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory)); + } + + @VisibleForTesting + ExecutorService getPool() { + return pool; + } + + @VisibleForTesting + AsyncProcess getAsyncProcess() { + return ap; } @Override @@ -193,22 +201,22 @@ public class BufferedMutatorImpl implements BufferedMutator { // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. backgroundFlushCommits(true); - this.pool.shutdown(); - boolean terminated; - int loopCnt = 0; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - loopCnt += 1; - if (loopCnt >= 10) { - LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); - break; - } - } while (!terminated); - + if (cleanupPoolOnClose) { + this.pool.shutdown(); + boolean terminated; + int loopCnt = 0; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + loopCnt += 1; + if (loopCnt >= 10) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + break; + } + } while (!terminated); + } } catch (InterruptedException e) { LOG.warn("waitForTermination interrupted"); - } finally { this.closed = true; } @@ -239,8 +247,9 @@ public class BufferedMutatorImpl implements BufferedMutator { if (!synchronous) { QueueRowAccess taker = new QueueRowAccess(); + AsyncProcessTask task = wrapAsyncProcessTask(taker); try { - ap.submit(tableName, taker, true, null, false); + ap.submit(task); if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); @@ -251,17 +260,17 @@ public class BufferedMutatorImpl implements BufferedMutator { } if (synchronous || ap.hasError()) { QueueRowAccess taker = new QueueRowAccess(); + AsyncProcessTask task = wrapAsyncProcessTask(taker); try { while (!taker.isEmpty()) { - ap.submit(tableName, taker, true, null, false); + ap.submit(task); taker.reset(); } } finally { taker.restoreRemainder(); } - RetriesExhaustedWithDetailsException error = - ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); + ap.waitForAllPreviousOpsAndReset(null, tableName); if (error != null) { if (listener == null) { throw error; @@ -273,8 +282,38 @@ public class BufferedMutatorImpl implements BufferedMutator { } /** + * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}. + * @param taker access the inner buffer. + * @return An AsyncProcessTask which always returns the latest rpc and operation timeout. + */ + private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) { + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(taker) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) + .build(); + return new AsyncProcessTask(task) { + @Override + public int getRpcTimeout() { + return rpcTimeout.get(); + } + + @Override + public int getOperationTimeout() { + return operationTimeout.get(); + } + }; + } + /** * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought * not be called for production uses. + * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()} + * will be called. + * @param writeBufferSize The max size of internal buffer where data is stored. + * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException + * if an I/O error occurs and there are too many retries. + * @throws java.io.InterruptedIOException if the I/O task is interrupted. * @deprecated Going away when we drop public support for {@link HTable}. */ @Deprecated @@ -295,15 +334,23 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public void setRpcTimeout(int timeout) { - this.writeRpcTimeout = timeout; - ap.setRpcTimeout(timeout); + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout.set(rpcTimeout); } @Override - public void setOperationTimeout(int timeout) { - this.operationTimeout = timeout; - ap.setOperationTimeout(operationTimeout); + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout.set(operationTimeout); + } + + @VisibleForTesting + long getCurrentWriteBufferSize() { + return currentWriteBufferSize.get(); + } + + @VisibleForTesting + int size() { + return undealtMutationCount.get(); } private class QueueRowAccess implements RowAccess<Row> { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index 17c69ec..9c901e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -39,7 +39,8 @@ public class BufferedMutatorParams implements Cloneable { private int maxKeyValueSize = UNSET; private ExecutorService pool = null; private String implementationClassName = null; - + private int rpcTimeout = UNSET; + private int operationTimeout = UNSET; private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { @Override public void onException(RetriesExhaustedWithDetailsException exception, @@ -61,6 +62,24 @@ public class BufferedMutatorParams implements Cloneable { return writeBufferSize; } + public BufferedMutatorParams rpcTimeout(final int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public int getRpcTimeout() { + return rpcTimeout; + } + + public BufferedMutatorParams opertationTimeout(final int operationTimeout) { + this.operationTimeout = operationTimeout; + return this; + } + + public int getOperationTimeout() { + return operationTimeout; + } + /** * Override the write buffer size specified by the provided {@link Connection}'s * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 35bebae..41f5baf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -42,7 +42,8 @@ public class ConnectionConfiguration { private final int replicaCallTimeoutMicroSecondScan; private final int retries; private final int maxKeyValueSize; - + private final int readRpcTimeout; + private final int writeRpcTimeout; // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; @@ -80,6 +81,12 @@ public class ConnectionConfiguration { Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + + this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } /** @@ -99,6 +106,16 @@ public class ConnectionConfiguration { this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; + this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + } + + public int getReadRpcTimeout() { + return readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return writeRpcTimeout; } public long getWriteBufferSize() { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index a597be3..ceac3fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -249,7 +249,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - this.asyncProcess = createAsyncProcess(this.conf); + this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory); if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { this.metrics = new MetricsConnection(this); } else { @@ -1833,17 +1833,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { metaCache.clearCache(regionInfo); } - // For tests to override. - protected AsyncProcess createAsyncProcess(Configuration conf) { - // No default pool available. - int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, - rpcTimeout, operationTimeout); - } - @Override public AsyncProcess getAsyncProcess() { return asyncProcess; http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index dd11abf..fd5eda3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -103,27 +103,28 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceStability.Stable public class HTable implements Table { private static final Log LOG = LogFactory.getLog(HTable.class); - protected ClusterConnection connection; + private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; + private final ClusterConnection connection; private final TableName tableName; - private volatile Configuration configuration; - private ConnectionConfiguration connConfiguration; - protected BufferedMutatorImpl mutator; + private final Configuration configuration; + private final ConnectionConfiguration connConfiguration; + @VisibleForTesting + BufferedMutatorImpl mutator; private boolean closed = false; - protected int scannerCaching; - protected long scannerMaxResultSize; - private ExecutorService pool; // For Multi & Scan + private final int scannerCaching; + private final long scannerMaxResultSize; + private final ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc private int readRpcTimeout; // timeout for each read rpc request private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() - private final boolean cleanupConnectionOnClose; // close the connection in close() - private Consistency defaultConsistency = Consistency.STRONG; - private HRegionLocator locator; + private final HRegionLocator locator; /** The Async process for batch */ - protected AsyncProcess multiAp; - private RpcRetryingCallerFactory rpcCallerFactory; - private RpcControllerFactory rpcControllerFactory; + @VisibleForTesting + AsyncProcess multiAp; + private final RpcRetryingCallerFactory rpcCallerFactory; + private final RpcControllerFactory rpcControllerFactory; // Marked Private @since 1.0 @InterfaceAudience.Private @@ -167,22 +168,42 @@ public class HTable implements Table { throw new IllegalArgumentException("Given table name is null"); } this.tableName = tableName; - this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); - this.connConfiguration = tableConfig; - this.pool = pool; + if (tableConfig == null) { + connConfiguration = new ConnectionConfiguration(configuration); + } else { + connConfiguration = tableConfig; + } if (pool == null) { this.pool = getDefaultExecutor(this.configuration); this.cleanupPoolOnClose = true; } else { + this.pool = pool; this.cleanupPoolOnClose = false; } + if (rpcCallerFactory == null) { + this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); + } else { + this.rpcCallerFactory = rpcCallerFactory; + } - this.rpcCallerFactory = rpcCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; + if (rpcControllerFactory == null) { + this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); + } else { + this.rpcControllerFactory = rpcControllerFactory; + } + + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); + this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - this.finishSetup(); + // puts need to track errors globally due to how the APIs currently work. + multiAp = this.connection.getAsyncProcess(); + this.locator = new HRegionLocator(tableName, connection); } /** @@ -190,20 +211,23 @@ public class HTable implements Table { * @throws IOException */ @VisibleForTesting - protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { + protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException { connection = conn; - tableName = params.getTableName(); - connConfiguration = new ConnectionConfiguration(connection.getConfiguration()); + this.tableName = mutator.getName(); + this.configuration = connection.getConfiguration(); + connConfiguration = new ConnectionConfiguration(configuration); cleanupPoolOnClose = false; - cleanupConnectionOnClose = false; - // used from tests, don't trust the connection is real - this.mutator = new BufferedMutatorImpl(conn, null, null, params); - this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.mutator = mutator; + this.operationTimeout = tableName.isSystemTable() ? + connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); + this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); + this.scannerCaching = connConfiguration.getScannerCaching(); + this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); + this.rpcControllerFactory = null; + this.rpcCallerFactory = null; + this.pool = mutator.getPool(); + this.locator = null; } /** @@ -214,36 +238,6 @@ public class HTable implements Table { } /** - * setup this HTable's parameter based on the passed configuration - */ - private void finishSetup() throws IOException { - if (connConfiguration == null) { - connConfiguration = new ConnectionConfiguration(configuration); - } - - this.operationTimeout = tableName.isSystemTable() ? - connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.scannerCaching = connConfiguration.getScannerCaching(); - this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); - if (this.rpcCallerFactory == null) { - this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); - } - if (this.rpcControllerFactory == null) { - this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); - } - - // puts need to track errors globally due to how the APIs currently work. - multiAp = this.connection.getAsyncProcess(); - this.locator = new HRegionLocator(getName(), connection); - } - - /** * {@inheritDoc} */ @Override @@ -423,7 +417,7 @@ public class HTable implements Table { get = ReflectionUtils.newInstance(get.getClass(), get); get.setCheckExistenceOnly(checkExistenceOnly); if (get.getConsistency() == null){ - get.setConsistency(defaultConsistency); + get.setConsistency(DEFAULT_CONSISTENCY); } } @@ -483,13 +477,37 @@ public class HTable implements Table { @Override public void batch(final List<? extends Row> actions, final Object[] results) throws InterruptedException, IOException { - batch(actions, results, -1); + int rpcTimeout = writeRpcTimeout; + boolean hasRead = false; + boolean hasWrite = false; + for (Row action : actions) { + if (action instanceof Mutation) { + hasWrite = true; + } else { + hasRead = true; + } + if (hasRead && hasWrite) { + break; + } + } + if (hasRead && !hasWrite) { + rpcTimeout = readRpcTimeout; + } + batch(actions, results, rpcTimeout); } public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) throws InterruptedException, IOException { - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, - rpcTimeout); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(actions) + .setResults(results) + .setRpcTimeout(rpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -509,8 +527,20 @@ public class HTable implements Table { public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results, Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - AsyncRequestFuture ars = connection.getAsyncProcess().submitAll( - pool, tableName, actions, callback, results); + int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); + int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback) + .setPool(pool) + .setTableName(tableName) + .setRowAccess(actions) + .setResults(results) + .setOperationTimeout(operationTimeout) + .setRpcTimeout(writeTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -536,8 +566,16 @@ public class HTable implements Table { } }; List<Delete> rows = Collections.singletonList(delete); - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, null, callable, writeRpcTimeout); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rows) + .setCallable(callable) + .setRpcTimeout(writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -615,8 +653,16 @@ public class HTable implements Table { return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); } }; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, writeRpcTimeout); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rm.getMutations()) + .setCallable(callable) + .setRpcTimeout(writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -795,8 +841,18 @@ public class HTable implements Table { }; List<Delete> rows = Collections.singletonList(delete); Object[] results = new Object[1]; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, results, callable, -1); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rows) + .setCallable(callable) + // TODO any better timeout? + .setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout)) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .setResults(results) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -839,8 +895,18 @@ public class HTable implements Table { * It is excessive to send such a large array, but that is required by the framework right now * */ Object[] results = new Object[rm.getMutations().size()]; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, -1); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(pool) + .setTableName(tableName) + .setRowAccess(rm.getMutations()) + .setResults(results) + .setCallable(callable) + // TODO any better timeout? + .setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout)) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -926,6 +992,10 @@ public class HTable implements Table { return; } flushCommits(); + if (mutator != null) { + mutator.close(); + mutator = null; + } if (cleanupPoolOnClose) { this.pool.shutdown(); try { @@ -939,11 +1009,6 @@ public class HTable implements Table { LOG.warn("waitForTermination interrupted"); } } - if (cleanupConnectionOnClose) { - if (this.connection != null) { - this.connection.close(); - } - } this.closed = true; } @@ -1102,7 +1167,6 @@ public class HTable implements Table { if (mutator != null) { mutator.setOperationTimeout(operationTimeout); } - multiAp.setOperationTimeout(operationTimeout); } @Override @@ -1134,7 +1198,6 @@ public class HTable implements Table { if (mutator != null) { mutator.setRpcTimeout(writeRpcTimeout); } - multiAp.setRpcTimeout(writeRpcTimeout); } @Override @@ -1217,37 +1280,41 @@ public class HTable implements Table { Object[] results = new Object[execs.size()]; AsyncProcess asyncProcess = - new AsyncProcess(connection, configuration, pool, + new AsyncProcess(connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration), readRpcTimeout, - operationTimeout); - - AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs, - new Callback<ClientProtos.CoprocessorServiceResult>() { - @Override - public void update(byte[] region, byte[] row, - ClientProtos.CoprocessorServiceResult serviceResult) { - if (LOG.isTraceEnabled()) { - LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + - ": region=" + Bytes.toStringBinary(region) + - ", row=" + Bytes.toStringBinary(row) + - ", value=" + serviceResult.getValue().getValue()); - } - try { - Message.Builder builder = responsePrototype.newBuilderForType(); - org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, - serviceResult.getValue().getValue().toByteArray()); - callback.update(region, row, (R) builder.build()); - } catch (IOException e) { - LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), - e); - callbackErrorExceptions.add(e); - callbackErrorActions.add(execsByRow.get(row)); - callbackErrorServers.add("null"); - } - } - }, results); - + true, RpcControllerFactory.instantiate(configuration)); + + Callback<ClientProtos.CoprocessorServiceResult> resultsCallback + = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { + if (LOG.isTraceEnabled()) { + LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + + ": region=" + Bytes.toStringBinary(region) + + ", row=" + Bytes.toStringBinary(row) + + ", value=" + serviceResult.getValue().getValue()); + } + try { + Message.Builder builder = responsePrototype.newBuilderForType(); + org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, + serviceResult.getValue().getValue().toByteArray()); + callback.update(region, row, (R) builder.build()); + } catch (IOException e) { + LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), + e); + callbackErrorExceptions.add(e); + callbackErrorActions.add(execsByRow.get(row)); + callbackErrorServers.add("null"); + } + }; + AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = AsyncProcessTask.newBuilder(resultsCallback) + .setPool(pool) + .setTableName(tableName) + .setRowAccess(execs) + .setResults(results) + .setRpcTimeout(readRpcTimeout) + .setOperationTimeout(operationTimeout) + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) + .build(); + AsyncRequestFuture future = asyncProcess.submit(task); future.waitUntilDone(); if (future.hasError()) { @@ -1270,10 +1337,10 @@ public class HTable implements Table { .pool(pool) .writeBufferSize(connConfiguration.getWriteBufferSize()) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) + .opertationTimeout(operationTimeout) + .rpcTimeout(writeRpcTimeout) ); } - mutator.setRpcTimeout(writeRpcTimeout); - mutator.setOperationTimeout(operationTimeout); return mutator; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 8ff64bf..c03b969 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -443,7 +443,7 @@ public class HTableMultiplexer { private final AtomicInteger retryInQueue = new AtomicInteger(0); private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor private final int operationTimeout; - + private final ExecutorService pool; public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, ScheduledExecutorService executor) { @@ -457,10 +457,10 @@ public class HTableMultiplexer { HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, - writeRpcTimeout, operationTimeout); + this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); + this.pool = pool; } protected LinkedBlockingQueue<PutStatus> getQueue() { @@ -594,9 +594,14 @@ public class HTableMultiplexer { Map<ServerName, MultiAction> actionsByServer = Collections.singletonMap(server, actions); try { + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setResults(results) + .setPool(pool) + .setRpcTimeout(writeRpcTimeout) + .setOperationTimeout(operationTimeout) + .build(); AsyncRequestFuture arf = - ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null, - null, actionsByServer, null); + ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); arf.waitUntilDone(); if (arf.hasError()) { // We just log and ignore the exception here since failed Puts will be resubmit again.