Repository: hbase
Updated Branches:
  refs/heads/master de5a3a006 -> c000f29e4


HBASE-16224 Reduce the number of RPCs for the large PUTs (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c000f29e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c000f29e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c000f29e

Branch: refs/heads/master
Commit: c000f29e47b9a6a7ffa71848f0df3d5c1a72312e
Parents: de5a3a0
Author: chenheng <chenh...@apache.org>
Authored: Tue Aug 30 06:35:33 2016 +0800
Committer: chenheng <chenh...@apache.org>
Committed: Tue Aug 30 06:35:33 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 528 +++++++++++++++----
 .../hbase/client/BufferedMutatorImpl.java       | 151 ++++--
 .../apache/hadoop/hbase/client/RowAccess.java   |  44 ++
 .../hadoop/hbase/client/TestAsyncProcess.java   | 430 ++++++++++++++-
 .../coprocessor/MultiRowMutationEndpoint.java   |   2 +-
 5 files changed, 997 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c000f29e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index d699233..045885f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -19,17 +19,13 @@
 
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.InterruptedIOException;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -38,31 +34,39 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.RetryImmediatelyException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.htrace.Trace;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class  allows a continuous flow of requests. It's written to be 
compatible with a
  * synchronous caller such as HTable.
@@ -127,6 +131,25 @@ class AsyncProcess {
   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
 
   /**
+   * The maximum size of single RegionServer.
+   */
+  public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 
"hbase.client.max.perrequest.heapsize";
+
+  /**
+   * Default value of {@link #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE}.
+   */
+  public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 
4194304;
+
+  /**
+   * The maximum size of submit.
+   */
+  public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = 
"hbase.client.max.submit.heapsize";
+  /**
+   * Default value of {@link #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE}.
+   */
+  public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = 
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
+
+  /**
    * The context used to wait for results from one submit call.
    * 1) If AsyncProcess is set to track errors globally, and not per call (for 
HTable puts),
    *    then errors and failed operations in this object will reflect global 
errors.
@@ -208,7 +231,6 @@ class AsyncProcess {
       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
   protected final ConcurrentMap<ServerName, AtomicInteger> 
taskCounterPerServer =
       new ConcurrentHashMap<ServerName, AtomicInteger>();
-
   // Start configuration settings.
   private final int startLogErrorsCnt;
 
@@ -218,6 +240,11 @@ class AsyncProcess {
   protected final int maxTotalConcurrentTasks;
 
   /**
+   * The max heap size of all tasks simultaneously executed on a server.
+   */
+  protected final long maxHeapSizePerRequest;
+  protected final long maxHeapSizeSubmit;
+  /**
    * The number of tasks we run in parallel on a single region.
    * With 1 (the default) , we ensure that the ordering of the queries is 
respected: we don't start
    * a set of operations on a region before the previous one is done. As well, 
this limits
@@ -278,7 +305,6 @@ class AsyncProcess {
       addresses.addAll(other.addresses);
     }
   }
-
   public AsyncProcess(ClusterConnection hc, Configuration conf, 
ExecutorService pool,
       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
       RpcControllerFactory rpcFactory, int rpcTimeout) {
@@ -306,7 +332,9 @@ class AsyncProcess {
           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
     this.maxConcurrentTasksPerRegion = 
conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
-
+    this.maxHeapSizePerRequest = 
conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+          DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+    this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, 
DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
     this.startLogErrorsCnt =
         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 
DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
 
@@ -321,7 +349,15 @@ class AsyncProcess {
       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
           maxConcurrentTasksPerRegion);
     }
+    if (this.maxHeapSizePerRequest <= 0) {
+      throw new IllegalArgumentException("maxHeapSizePerServer=" +
+          maxHeapSizePerRequest);
+    }
 
+    if (this.maxHeapSizeSubmit <= 0) {
+      throw new IllegalArgumentException("maxHeapSizeSubmit=" +
+          maxHeapSizeSubmit);
+    }
     // Server tracker allows us to do faster, and yet useful (hopefully), 
retries.
     // However, if we are too useful, we might fail very quickly due to retry 
count limit.
     // To avoid this, we are going to cheat for now (see HBASE-7659), and 
calculate maximum
@@ -356,16 +392,34 @@ class AsyncProcess {
     }
     throw new RuntimeException("Neither AsyncProcess nor request have 
ExecutorService");
   }
-
   /**
    * See {@link #submit(ExecutorService, TableName, List, boolean, 
Batch.Callback, boolean)}.
    * Uses default ExecutorService for this AP (must have been created with 
one).
    */
-  public <CResult> AsyncRequestFuture submit(TableName tableName, List<? 
extends Row> rows,
+  public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? 
extends Row> rows,
       boolean atLeastOne, Batch.Callback<CResult> callback, boolean 
needResults)
       throws InterruptedIOException {
     return submit(null, tableName, rows, atLeastOne, callback, needResults);
   }
+  /**
+   * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, 
Batch.Callback, boolean)}.
+   * Uses default ExecutorService for this AP (must have been created with 
one).
+   */
+  public <CResult> AsyncRequestFuture submit(TableName tableName,
+      final RowAccess<? extends Row> rows, boolean atLeastOne, 
Batch.Callback<CResult> callback,
+      boolean needResults) throws InterruptedIOException {
+    return submit(null, tableName, rows, atLeastOne, callback, needResults);
+  }
+  /**
+   * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, 
Batch.Callback, boolean)}.
+   * Uses the {@link ListRowAccess} to wrap the {@link List}.
+   */
+  public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName 
tableName,
+      List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> 
callback,
+      boolean needResults) throws InterruptedIOException {
+    return submit(pool, tableName, new ListRowAccess(rows), atLeastOne,
+      callback, needResults);
+  }
 
   /**
    * Extract from the rows list what we can submit. The rows we can not submit 
are kept in the
@@ -380,7 +434,7 @@ class AsyncProcess {
    * @param atLeastOne true if we should submit at least a subset.
    */
   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName 
tableName,
-      List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> 
callback,
+      RowAccess<? extends Row> rows, boolean atLeastOne, 
Batch.Callback<CResult> callback,
       boolean needResults) throws InterruptedIOException {
     if (rows.isEmpty()) {
       return NO_REQS_RESULT;
@@ -396,16 +450,15 @@ class AsyncProcess {
     // Location errors that happen before we decide what requests to take.
     List<Exception> locationErrors = null;
     List<Integer> locationErrorRows = null;
+    RowCheckerHost checker = createRowCheckerHost();
+    boolean firstIter = true;
     do {
       // Wait until there is at least one slot for a new task.
       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, 
tableName.getNameAsString());
-
-      // Remember the previous decisions about regions or region servers we 
put in the
-      //  final multi.
-      Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, 
Boolean>();
-      Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, 
Boolean>();
-
       int posInList = -1;
+      if (!firstIter) {
+        checker.reset();
+      }
       Iterator<? extends Row> it = rows.iterator();
       while (it.hasNext()) {
         Row r = it.next();
@@ -434,8 +487,12 @@ class AsyncProcess {
           it.remove();
           break; // Backward compat: we stop considering actions on location 
error.
         }
-
-        if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
+        long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0;
+        ReturnCode code = checker.canTakeOperation(loc, rowSize);
+        if (code == ReturnCode.END) {
+          break;
+        }
+        if (code == ReturnCode.INCLUDE) {
           Action<Row> action = new Action<Row>(r, ++posInList);
           setNonce(ng, r, action);
           retainedActions.add(action);
@@ -445,6 +502,7 @@ class AsyncProcess {
           it.remove();
         }
       }
+      firstIter = false;
     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == 
null));
 
     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
@@ -453,6 +511,18 @@ class AsyncProcess {
         locationErrors, locationErrorRows, actionsByServer, pool);
   }
 
+  private RowCheckerHost createRowCheckerHost() {
+    return new RowCheckerHost(Arrays.asList(
+        new TaskCountChecker(maxTotalConcurrentTasks,
+          maxConcurrentTasksPerServer,
+          maxConcurrentTasksPerRegion,
+          tasksInProgress,
+          taskCounterPerServer,
+          taskCounterPerRegion)
+        , new RequestSizeChecker(maxHeapSizePerRequest)
+        , new SubmittedSizeChecker(maxHeapSizeSubmit)
+    ));
+  }
   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
       List<Action<Row>> retainedActions, long nonceGroup, 
Batch.Callback<CResult> callback,
       Object[] results, boolean needResults, List<Exception> locationErrors,
@@ -494,74 +564,6 @@ class AsyncProcess {
 
     multiAction.add(regionName, action);
   }
-
-  /**
-   * Check if we should send new operations to this region or region server.
-   * We're taking into account the past decision; if we have already accepted
-   * operation on a given region, we accept all operations for this region.
-   *
-   * @param loc; the region and the server name we want to use.
-   * @return true if this region is considered as busy.
-   */
-  protected boolean canTakeOperation(HRegionLocation loc,
-                                     Map<HRegionInfo, Boolean> regionsIncluded,
-                                     Map<ServerName, Boolean> serversIncluded) 
{
-    HRegionInfo regionInfo = loc.getRegionInfo();
-    Boolean regionPrevious = regionsIncluded.get(regionInfo);
-
-    if (regionPrevious != null) {
-      // We already know what to do with this region.
-      return regionPrevious;
-    }
-
-    Boolean serverPrevious = serversIncluded.get(loc.getServerName());
-    if (Boolean.FALSE.equals(serverPrevious)) {
-      // It's a new region, on a region server that we have already excluded.
-      regionsIncluded.put(regionInfo, Boolean.FALSE);
-      return false;
-    }
-
-    AtomicInteger regionCnt = 
taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
-    if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
-      // Too many tasks on this region already.
-      regionsIncluded.put(regionInfo, Boolean.FALSE);
-      return false;
-    }
-
-    if (serverPrevious == null) {
-      // The region is ok, but we need to decide for this region server.
-      int newServers = 0; // number of servers we're going to contact so far
-      for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
-        if (kv.getValue()) {
-          newServers++;
-        }
-      }
-
-      // Do we have too many total tasks already?
-      boolean ok = (newServers + tasksInProgress.get()) < 
maxTotalConcurrentTasks;
-
-      if (ok) {
-        // If the total is fine, is it ok for this individual server?
-        AtomicInteger serverCnt = 
taskCounterPerServer.get(loc.getServerName());
-        ok = (serverCnt == null || serverCnt.get() < 
maxConcurrentTasksPerServer);
-      }
-
-      if (!ok) {
-        regionsIncluded.put(regionInfo, Boolean.FALSE);
-        serversIncluded.put(loc.getServerName(), Boolean.FALSE);
-        return false;
-      }
-
-      serversIncluded.put(loc.getServerName(), Boolean.TRUE);
-    } else {
-      assert serverPrevious.equals(Boolean.TRUE);
-    }
-
-    regionsIncluded.put(regionInfo, Boolean.TRUE);
-
-    return true;
-  }
-
   /**
    * See {@link #submitAll(ExecutorService, TableName, List, Batch.Callback, 
Object[])}.
    * Uses default ExecutorService for this AP (must have been created with 
one).
@@ -740,7 +742,7 @@ class AsyncProcess {
       private final int numAttempt;
       private final ServerName server;
       private final Set<CancellableRegionServerCallable> callsInProgress;
-
+      private Long heapSize = null;
       private SingleServerRequestRunnable(
           MultiAction<Row> multiAction, int numAttempt, ServerName server,
           Set<CancellableRegionServerCallable> callsInProgress) {
@@ -750,6 +752,24 @@ class AsyncProcess {
         this.callsInProgress = callsInProgress;
       }
 
+      @VisibleForTesting
+      long heapSize() {
+        if (heapSize != null) {
+          return heapSize;
+        }
+        heapSize = 0L;
+        for (Map.Entry<byte[], List<Action<Row>>> e: 
this.multiAction.actions.entrySet()) {
+          List<Action<Row>> actions = e.getValue();
+          for (Action<Row> action: actions) {
+            Row row = action.getAction();
+            if (row instanceof Mutation) {
+              heapSize += ((Mutation) row).heapSize();
+            }
+          }
+        }
+        return heapSize;
+      }
+
       @Override
       public void run() {
         MultiResponse res;
@@ -831,7 +851,7 @@ class AsyncProcess {
     private final long nonceGroup;
     private CancellableRegionServerCallable currentCallable;
     private int currentCallTotalTimeout;
-
+    private final Map<ServerName, List<Long>> heapSizesByServer = new 
HashMap<>();
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> 
actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
         Batch.Callback<CResult> callback, CancellableRegionServerCallable 
callable, int timeout) {
@@ -910,7 +930,21 @@ class AsyncProcess {
     public Set<CancellableRegionServerCallable> getCallsInProgress() {
       return callsInProgress;
     }
+    @VisibleForTesting
+    Map<ServerName, List<Long>> getRequestHeapSize() {
+      return heapSizesByServer;
+    }
 
+    private SingleServerRequestRunnable 
addSingleServerRequestHeapSize(ServerName server,
+        SingleServerRequestRunnable runnable) {
+      List<Long> heapCount = heapSizesByServer.get(server);
+      if (heapCount == null) {
+        heapCount = new LinkedList<>();
+        heapSizesByServer.put(server, heapCount);
+      }
+      heapCount.add(runnable.heapSize());
+      return runnable;
+    }
     /**
      * Group a list of actions per region servers, and send them.
      *
@@ -1080,8 +1114,9 @@ class AsyncProcess {
         if (connection.getConnectionMetrics() != null) {
           connection.getConnectionMetrics().incrNormalRunners();
         }
-        return 
Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
-            new SingleServerRequestRunnable(multiAction, numAttempt, server, 
callsInProgress)));
+        SingleServerRequestRunnable runnable = 
addSingleServerRequestHeapSize(server,
+          new SingleServerRequestRunnable(multiAction, numAttempt, server, 
callsInProgress));
+        return 
Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
       }
 
       // group the actions by the amount of delay
@@ -1102,9 +1137,8 @@ class AsyncProcess {
       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
       for (DelayingRunner runner : actions.values()) {
         String traceText = "AsyncProcess.sendMultiAction";
-        Runnable runnable =
-            new SingleServerRequestRunnable(runner.getActions(), numAttempt, 
server,
-                callsInProgress);
+        Runnable runnable = addSingleServerRequestHeapSize(server,
+          new SingleServerRequestRunnable(runner.getActions(), numAttempt, 
server, callsInProgress));
         // use a delay runner only if we need to sleep for some time
         if (runner.getSleepTime() > 0) {
           runner.setRunner(runnable);
@@ -1941,4 +1975,284 @@ class AsyncProcess {
     NO_RETRIES_EXHAUSTED,
     NO_OTHER_SUCCEEDED
   }
+
+  /**
+   * Collect all advices from checkers and make the final decision.
+   */
+  @VisibleForTesting
+  static class RowCheckerHost {
+    private final List<RowChecker> checkers;
+    private boolean isEnd = false;
+    RowCheckerHost(final List<RowChecker> checkers) {
+      this.checkers = checkers;
+    }
+    void reset() throws InterruptedIOException {
+      isEnd = false;
+      InterruptedIOException e = null;
+      for (RowChecker checker : checkers) {
+        try {
+          checker.reset();
+        } catch (InterruptedIOException ex) {
+          e = ex;
+        }
+      }
+      if (e != null) {
+        throw e;
+      }
+    }
+    ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+      if (isEnd) {
+        return ReturnCode.END;
+      }
+      ReturnCode code = ReturnCode.INCLUDE;
+      for (RowChecker checker : checkers) {
+        switch (checker.canTakeOperation(loc, rowSize)) {
+          case END:
+            isEnd = true;
+            code = ReturnCode.END;
+            break;
+          case SKIP:
+            code = ReturnCode.SKIP;
+            break;
+          case INCLUDE:
+          default:
+            break;
+        }
+        if (code == ReturnCode.END) {
+          break;
+        }
+      }
+      for (RowChecker checker : checkers) {
+        checker.notifyFinal(code, loc, rowSize);
+      }
+      return code;
+    }
+  }
+
+  /**
+   * Provide a way to control the flow of rows iteration.
+   */
+  @VisibleForTesting
+  interface RowChecker {
+    enum ReturnCode {
+      /**
+       * Accept current row.
+       */
+      INCLUDE,
+      /**
+       * Skip current row.
+       */
+      SKIP,
+      /**
+       * No more row can be included.
+       */
+      END
+    };
+    ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
+    /**
+     * Add the final ReturnCode to the checker.
+     * The ReturnCode may be reversed, so the checker need the final decision 
to update
+     * the inner state.
+     */
+    void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
+    /**
+     * Reset the inner state.
+     */
+    void reset() throws InterruptedIOException ;
+  }
+
+  /**
+   * limit the heapsize of total submitted data.
+   * Reduce the limit of heapsize for submitting quickly
+   * if there is no running task.
+   */
+  @VisibleForTesting
+  static class SubmittedSizeChecker implements RowChecker {
+    private final long maxHeapSizeSubmit;
+    private long heapSize = 0;
+    SubmittedSizeChecker(final long maxHeapSizeSubmit) {
+      this.maxHeapSizeSubmit = maxHeapSizeSubmit;
+    }
+    @Override
+    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+      if (heapSize >= maxHeapSizeSubmit) {
+        return ReturnCode.END;
+      }
+      return ReturnCode.INCLUDE;
+    }
+
+    @Override
+    public void notifyFinal(ReturnCode code, HRegionLocation loc, long 
rowSize) {
+      if (code == ReturnCode.INCLUDE) {
+        heapSize += rowSize;
+      }
+    }
+
+    @Override
+    public void reset() {
+      heapSize = 0;
+    }
+  }
+  /**
+   * limit the max number of tasks in an AsyncProcess.
+   */
+  @VisibleForTesting
+  static class TaskCountChecker implements RowChecker {
+    private static final long MAX_WAITING_TIME = 1000; //ms
+    private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
+    private final Set<ServerName> serversIncluded = new HashSet<>();
+    private final int maxConcurrentTasksPerRegion;
+    private final int maxTotalConcurrentTasks;
+    private final int maxConcurrentTasksPerServer;
+    private final Map<byte[], AtomicInteger> taskCounterPerRegion;
+    private final Map<ServerName, AtomicInteger> taskCounterPerServer;
+    private final Set<byte[]> busyRegions = new 
TreeSet<>(Bytes.BYTES_COMPARATOR);
+    private final AtomicLong tasksInProgress;
+    TaskCountChecker(final int maxTotalConcurrentTasks,
+      final int maxConcurrentTasksPerServer,
+      final int maxConcurrentTasksPerRegion,
+      final AtomicLong tasksInProgress,
+      final Map<ServerName, AtomicInteger> taskCounterPerServer,
+      final Map<byte[], AtomicInteger> taskCounterPerRegion) {
+      this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
+      this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
+      this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
+      this.taskCounterPerRegion = taskCounterPerRegion;
+      this.taskCounterPerServer = taskCounterPerServer;
+      this.tasksInProgress = tasksInProgress;
+    }
+    @Override
+    public void reset() throws InterruptedIOException {
+      // prevent the busy-waiting
+      waitForRegion();
+      regionsIncluded.clear();
+      serversIncluded.clear();
+      busyRegions.clear();
+    }
+    private void waitForRegion() throws InterruptedIOException {
+      if (busyRegions.isEmpty()) {
+        return;
+      }
+      EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+      final long start = ee.currentTime();
+      while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
+        for (byte[] region : busyRegions) {
+          AtomicInteger count = taskCounterPerRegion.get(region);
+          if (count == null || count.get() < maxConcurrentTasksPerRegion) {
+            return;
+          }
+        }
+        try {
+          synchronized (tasksInProgress) {
+            tasksInProgress.wait(10);
+          }
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException("Interrupted." +
+              " tasksInProgress=" + tasksInProgress);
+        }
+      }
+    }
+    /**
+     * 1) check the regions is allowed.
+     * 2) check the concurrent tasks for regions.
+     * 3) check the total concurrent tasks.
+     * 4) check the concurrent tasks for server.
+     * @param loc
+     * @param rowSize
+     * @return
+     */
+    @Override
+    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+
+      HRegionInfo regionInfo = loc.getRegionInfo();
+      if (regionsIncluded.contains(regionInfo)) {
+        // We already know what to do with this region.
+        return ReturnCode.INCLUDE;
+      }
+      AtomicInteger regionCnt = 
taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
+      if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) 
{
+        // Too many tasks on this region already.
+        return ReturnCode.SKIP;
+      }
+      int newServers = serversIncluded.size()
+        + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
+      if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
+        // Too many tasks.
+        return ReturnCode.SKIP;
+      }
+      AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
+      if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) 
{
+        // Too many tasks for this individual server
+        return ReturnCode.SKIP;
+      }
+      return ReturnCode.INCLUDE;
+    }
+
+    @Override
+    public void notifyFinal(ReturnCode code, HRegionLocation loc, long 
rowSize) {
+      if (code == ReturnCode.INCLUDE) {
+        regionsIncluded.add(loc.getRegionInfo());
+        serversIncluded.add(loc.getServerName());
+      }
+      busyRegions.add(loc.getRegionInfo().getRegionName());
+    }
+  }
+
+  /**
+   * limit the request size for each regionserver.
+   */
+  @VisibleForTesting
+  static class RequestSizeChecker implements RowChecker {
+    private final long maxHeapSizePerRequest;
+    private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
+    RequestSizeChecker(final long maxHeapSizePerRequest) {
+      this.maxHeapSizePerRequest = maxHeapSizePerRequest;
+    }
+    @Override
+    public void reset() {
+      serverRequestSizes.clear();
+    }
+    @Override
+    public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+      // Is it ok for limit of request size?
+      long currentRequestSize = 
serverRequestSizes.containsKey(loc.getServerName()) ?
+        serverRequestSizes.get(loc.getServerName()) : 0L;
+      // accept at least one request
+      if (currentRequestSize == 0 || currentRequestSize + rowSize <= 
maxHeapSizePerRequest) {
+        return ReturnCode.INCLUDE;
+      }
+      return ReturnCode.SKIP;
+    }
+
+    @Override
+    public void notifyFinal(ReturnCode code, HRegionLocation loc, long 
rowSize) {
+      if (code == ReturnCode.INCLUDE) {
+        long currentRequestSize = 
serverRequestSizes.containsKey(loc.getServerName()) ?
+          serverRequestSizes.get(loc.getServerName()) : 0L;
+        serverRequestSizes.put(loc.getServerName(), currentRequestSize + 
rowSize);
+      }
+    }
+  }
+
+  public static class ListRowAccess<T> implements RowAccess<T> {
+    private final List<T> data;
+    ListRowAccess(final List<T> data) {
+      this.data = data;
+    }
+
+    @Override
+    public int size() {
+      return data.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return data.isEmpty();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      return data.iterator();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c000f29e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 39e4f75..2d4c8b3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -69,6 +71,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
   @VisibleForTesting
   AtomicLong currentWriteBufferSize = new AtomicLong(0);
 
+  /**
+   * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
+   * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
+   */
+  @VisibleForTesting
+  AtomicInteger undealtMutationCount = new AtomicInteger(0);
   private long writeBufferSize;
   private final int maxKeyValueSize;
   private boolean closed = false;
@@ -129,11 +137,13 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
     }
 
     long toAddSize = 0;
+    int toAddCount = 0;
     for (Mutation m : ms) {
       if (m instanceof Put) {
         validatePut((Put) m);
       }
       toAddSize += m.heapSize();
+      ++toAddCount;
     }
 
     // This behavior is highly non-intuitive... it does not protect us against
@@ -142,14 +152,17 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
     if (ap.hasError()) {
       currentWriteBufferSize.addAndGet(toAddSize);
       writeAsyncBuffer.addAll(ms);
+      undealtMutationCount.addAndGet(toAddCount);
       backgroundFlushCommits(true);
     } else {
       currentWriteBufferSize.addAndGet(toAddSize);
       writeAsyncBuffer.addAll(ms);
+      undealtMutationCount.addAndGet(toAddCount);
     }
 
     // Now try and queue what needs to be queued.
-    while (currentWriteBufferSize.get() > writeBufferSize) {
+    while (undealtMutationCount.get() != 0
+        && currentWriteBufferSize.get() > writeBufferSize) {
       backgroundFlushCommits(false);
     }
   }
@@ -208,58 +221,41 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
   private void backgroundFlushCommits(boolean synchronous) throws
       InterruptedIOException,
       RetriesExhaustedWithDetailsException {
+    if (!synchronous && writeAsyncBuffer.isEmpty()) {
+      return;
+    }
 
-    LinkedList<Mutation> buffer = new LinkedList<>();
-    // Keep track of the size so that this thread doesn't spin forever
-    long dequeuedSize = 0;
-
-    try {
-      // Grab all of the available mutations.
-      Mutation m;
-
-      // If there's no buffer size drain everything. If there is a buffersize 
drain up to twice
-      // that amount. This should keep the loop from continually spinning if 
there are threads
-      // that keep adding more data to the buffer.
-      while (
-          (writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || 
synchronous)
-              && (m = writeAsyncBuffer.poll()) != null) {
-        buffer.add(m);
-        long size = m.heapSize();
-        dequeuedSize += size;
-        currentWriteBufferSize.addAndGet(-size);
-      }
-
-      if (!synchronous && dequeuedSize == 0) {
-        return;
-      }
-
-      if (!synchronous) {
-        ap.submit(tableName, buffer, true, null, false);
+    if (!synchronous) {
+      QueueRowAccess taker = new QueueRowAccess();
+      try {
+        ap.submit(tableName, taker, true, null, false);
         if (ap.hasError()) {
           LOG.debug(tableName + ": One or more of the operations have failed -"
               + " waiting for all operation in progress to finish 
(successfully or not)");
         }
+      } finally {
+        taker.restoreRemainder();
       }
-      if (synchronous || ap.hasError()) {
-        while (!buffer.isEmpty()) {
-          ap.submit(tableName, buffer, true, null, false);
-        }
-        RetriesExhaustedWithDetailsException error =
-            ap.waitForAllPreviousOpsAndReset(null, 
tableName.getNameAsString());
-        if (error != null) {
-          if (listener == null) {
-            throw error;
-          } else {
-            this.listener.onException(error, this);
-          }
+    }
+    if (synchronous || ap.hasError()) {
+      QueueRowAccess taker = new QueueRowAccess();
+      try {
+        while (!taker.isEmpty()) {
+          ap.submit(tableName, taker, true, null, false);
+          taker.reset();
         }
+      } finally {
+        taker.restoreRemainder();
       }
-    } finally {
-      for (Mutation mut : buffer) {
-        long size = mut.heapSize();
-        currentWriteBufferSize.addAndGet(size);
-        dequeuedSize -= size;
-        writeAsyncBuffer.add(mut);
+
+      RetriesExhaustedWithDetailsException error =
+          ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
+      if (error != null) {
+        if (listener == null) {
+          throw error;
+        } else {
+          this.listener.onException(error, this);
+        }
       }
     }
   }
@@ -285,4 +281,67 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
   public long getWriteBufferSize() {
     return this.writeBufferSize;
   }
+
+  private class QueueRowAccess implements RowAccess<Row> {
+    private int remainder = undealtMutationCount.getAndSet(0);
+
+    void reset() {
+      restoreRemainder();
+      remainder = undealtMutationCount.getAndSet(0);
+    }
+
+    @Override
+    public Iterator<Row> iterator() {
+      return new Iterator<Row>() {
+        private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
+        private int countDown = remainder;
+        private Mutation last = null;
+        @Override
+        public boolean hasNext() {
+          if (countDown <= 0) {
+            return false;
+          }
+          return iter.hasNext();
+        }
+        @Override
+        public Row next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          last = iter.next();
+          if (last == null) {
+            throw new NoSuchElementException();
+          }
+          --countDown;
+          return last;
+        }
+        @Override
+        public void remove() {
+          if (last == null) {
+            throw new IllegalStateException();
+          }
+          iter.remove();
+          currentWriteBufferSize.addAndGet(-last.heapSize());
+          --remainder;
+        }
+      };
+    }
+
+    @Override
+    public int size() {
+      return remainder;
+    }
+
+    void restoreRemainder() {
+      if (remainder > 0) {
+        undealtMutationCount.addAndGet(remainder);
+        remainder = 0;
+      }
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return remainder <= 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c000f29e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
new file mode 100644
index 0000000..788f1a4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Provide a way to access the inner buffer.
+ * The purpose is to reduce the elapsed time to move a large number
+ * of elements between collections.
+ * @param <T>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@VisibleForTesting
+interface RowAccess<T> extends Iterable<T> {
+  /**
+   * @return true if there are no elements.
+   */
+  boolean isEmpty();
+
+  /**
+   * @return the number of elements in this list.
+   */
+  int size();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c000f29e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 5959078..516f2cf 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -25,8 +25,11 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
@@ -57,6 +60,12 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
+import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess;
+import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker;
+import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
+import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost;
+import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -65,13 +74,34 @@ import 
org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
-import static org.junit.Assert.assertTrue;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.mockito.Mockito;
+import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
+import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @Category({ClientTests.class, MediumTests.class})
 public class TestAsyncProcess {
@@ -181,6 +211,13 @@ public class TestAsyncProcess {
     }
 
     @Override
+    public <Res> AsyncRequestFuture submit(TableName tableName, RowAccess<? 
extends Row> rows,
+        boolean atLeastOne, Callback<Res> callback, boolean needResults)
+            throws InterruptedIOException {
+      // We use results in tests to check things, so override to always save 
them.
+      return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
+    }
+    @Override
     public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends 
Row> rows,
         boolean atLeastOne, Callback<Res> callback, boolean needResults)
             throws InterruptedIOException {
@@ -435,7 +472,186 @@ public class TestAsyncProcess {
       return null;
     }
   }
+  @Test
+  public void testListRowAccess() {
+    int count = 10;
+    List<String> values = new LinkedList<>();
+    for (int i = 0; i != count; ++i) {
+      values.add(String.valueOf(i));
+    }
 
+    ListRowAccess<String> taker = new ListRowAccess(values);
+    assertEquals(count, taker.size());
+
+    int restoreCount = 0;
+    int takeCount = 0;
+    Iterator<String> it = taker.iterator();
+    while (it.hasNext()) {
+      String v = it.next();
+      assertEquals(String.valueOf(takeCount), v);
+      ++takeCount;
+      it.remove();
+      if (Math.random() >= 0.5) {
+        break;
+      }
+    }
+    assertEquals(count, taker.size() + takeCount);
+
+    it = taker.iterator();
+    while (it.hasNext()) {
+      String v = it.next();
+      assertEquals(String.valueOf(takeCount), v);
+      ++takeCount;
+      it.remove();
+    }
+    assertEquals(0, taker.size());
+    assertEquals(count, takeCount);
+  }
+  private static long calculateRequestCount(long putSizePerServer, long 
maxHeapSizePerRequest) {
+    if (putSizePerServer <= maxHeapSizePerRequest) {
+      return 1;
+    } else if (putSizePerServer % maxHeapSizePerRequest == 0) {
+      return putSizePerServer / maxHeapSizePerRequest;
+    } else {
+      return putSizePerServer / maxHeapSizePerRequest + 1;
+    }
+  }
+
+  @Test
+  public void testSubmitSameSizeOfRequest() throws Exception {
+    long writeBuffer = 2 * 1024 * 1024;
+    long putsHeapSize = writeBuffer;
+    doSubmitRequest(writeBuffer, putsHeapSize);
+  }
+  @Test
+  public void testIllegalArgument() throws IOException {
+    ClusterConnection conn = createHConnection();
+    final long maxHeapSizePerRequest = 
conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+      AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+    
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
 -1);
+    try {
+      MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+      fail("The maxHeapSizePerRequest must be bigger than zero");
+    } catch (IllegalArgumentException e) {
+    }
+    
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
 maxHeapSizePerRequest);
+  }
+  @Test
+  public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
+    long maxHeapSizePerRequest = Long.MAX_VALUE;
+    long putsHeapSize = 2 * 1024 * 1024;
+    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
+  }
+
+  @Test(timeout=300000)
+  public void testSubmitRandomSizeRequest() throws Exception {
+    Random rn = new Random();
+    final long limit = 10 * 1024 * 1024;
+    for (int count = 0; count != 2; ++count) {
+      long maxHeapSizePerRequest = Math.max(1, (Math.abs(rn.nextLong()) % 
limit));
+      long putsHeapSize = Math.max(1, (Math.abs(rn.nextLong()) % limit));
+      LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + 
maxHeapSizePerRequest + ", putsHeapSize=" + putsHeapSize);
+      doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
+    }
+  }
+
+  @Test
+  public void testSubmitSmallRequest() throws Exception {
+    long maxHeapSizePerRequest = 2 * 1024 * 1024;
+    long putsHeapSize = 100;
+    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
+  }
+
+  @Test(timeout=120000)
+  public void testSubmitLargeRequest() throws Exception {
+    long maxHeapSizePerRequest = 2 * 1024 * 1024;
+    long putsHeapSize = maxHeapSizePerRequest * 2;
+    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
+  }
+
+  private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) 
throws Exception {
+    ClusterConnection conn = createHConnection();
+    final long defaultHeapSizePerRequest = 
conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+      AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+    
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
 maxHeapSizePerRequest);
+    BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+
+    // sn has two regions
+    long putSizeSN = 0;
+    long putSizeSN2 = 0;
+    List<Put> puts = new ArrayList<>();
+    while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
+      Put put1 = new Put(DUMMY_BYTES_1);
+      put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
+      Put put2 = new Put(DUMMY_BYTES_2);
+      put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
+      Put put3 = new Put(DUMMY_BYTES_3);
+      put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
+      putSizeSN += (put1.heapSize() + put2.heapSize());
+      putSizeSN2 += put3.heapSize();
+      puts.add(put1);
+      puts.add(put2);
+      puts.add(put3);
+    }
+
+    int minCountSnRequest = (int) calculateRequestCount(putSizeSN, 
maxHeapSizePerRequest);
+    int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, 
maxHeapSizePerRequest);
+    LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN + ", 
putSizeSN2:" + putSizeSN2
+      + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
+      + ", minCountSnRequest:" + minCountSnRequest
+      + ", minCountSn2Request:" + minCountSn2Request);
+    try (HTable ht = new HTable(conn, bufferParam)) {
+      MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
+      ht.mutator.ap = ap;
+
+      Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+      ht.put(puts);
+      List<AsyncRequestFuture> reqs = ap.allReqs;
+
+      int actualSnReqCount = 0;
+      int actualSn2ReqCount = 0;
+      for (AsyncRequestFuture req : reqs) {
+        if (!(req instanceof AsyncRequestFutureImpl)) {
+          continue;
+        }
+        AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+        if (ars.getRequestHeapSize().containsKey(sn)) {
+          ++actualSnReqCount;
+        }
+        if (ars.getRequestHeapSize().containsKey(sn2)) {
+          ++actualSn2ReqCount;
+        }
+      }
+      // If the server is busy, the actual count may be incremented.
+      assertEquals(true, minCountSnRequest <= actualSnReqCount);
+      assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
+      Map<ServerName, Long> sizePerServers = new HashMap<>();
+      for (AsyncRequestFuture req : reqs) {
+        if (!(req instanceof AsyncRequestFutureImpl)) {
+          continue;
+        }
+        AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+        Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
+        for (Map.Entry<ServerName, List<Long>> entry : 
requestHeapSize.entrySet()) {
+          long sum = 0;
+          for (long size : entry.getValue()) {
+            assertEquals(true, size <= maxHeapSizePerRequest);
+            sum += size;
+          }
+          assertEquals(true, sum <= maxHeapSizePerRequest);
+          long value = sizePerServers.containsKey(entry.getKey()) ? 
sizePerServers.get(entry.getKey()) : 0L;
+          sizePerServers.put(entry.getKey(), value + sum);
+        }
+      }
+      assertEquals(true, sizePerServers.containsKey(sn));
+      assertEquals(true, sizePerServers.containsKey(sn2));
+      assertEquals(false, sizePerServers.containsKey(sn3));
+      assertEquals(putSizeSN, (long) sizePerServers.get(sn));
+      assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
+    }
+    // restore config.
+    
conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
 defaultHeapSizePerRequest);
+  }
   @Test
   public void testSubmit() throws Exception {
     ClusterConnection hc = createHConnection();
@@ -477,7 +693,9 @@ public class TestAsyncProcess {
     List<Put> puts = new ArrayList<Put>();
     puts.add(createPut(1, true));
 
-    ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
+    for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) {
+      ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
+    }
     ap.submit(DUMMY_TABLE, puts, false, null, false);
     Assert.assertEquals(puts.size(), 1);
 
@@ -538,7 +756,7 @@ public class TestAsyncProcess {
   public void testSubmitTrue() throws IOException {
     final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, 
false);
     ap.tasksInProgress.incrementAndGet();
-    final AtomicInteger ai = new AtomicInteger(1);
+    final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion);
     ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
 
     final AtomicBoolean checkPoint = new AtomicBoolean(false);
@@ -672,6 +890,8 @@ public class TestAsyncProcess {
     setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
     setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
     setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
+    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), 
Mockito.anyBoolean(), Mockito.anyBoolean()))
+           .thenReturn(Arrays.asList(loc1, loc2, loc3));
     setMockLocation(hc, FAILS, new RegionLocations(loc2));
     return hc;
   }
@@ -681,6 +901,18 @@ public class TestAsyncProcess {
     setMockLocation(hc, DUMMY_BYTES_1, hrls1);
     setMockLocation(hc, DUMMY_BYTES_2, hrls2);
     setMockLocation(hc, DUMMY_BYTES_3, hrls3);
+    List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
+    for (HRegionLocation loc : hrls1.getRegionLocations()) {
+      locations.add(loc);
+    }
+    for (HRegionLocation loc : hrls2.getRegionLocations()) {
+      locations.add(loc);
+    }
+    for (HRegionLocation loc : hrls3.getRegionLocations()) {
+      locations.add(loc);
+    }
+    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), 
Mockito.anyBoolean(), Mockito.anyBoolean()))
+           .thenReturn(locations);
     return hc;
   }
 
@@ -688,6 +920,8 @@ public class TestAsyncProcess {
       RegionLocations result) throws IOException {
     Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
         Mockito.anyBoolean(), Mockito.anyBoolean(), 
Mockito.anyInt())).thenReturn(result);
+    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
+        Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
   }
 
   private static ClusterConnection createHConnectionCommon() {
@@ -789,6 +1023,195 @@ public class TestAsyncProcess {
   }
 
   @Test
+  public void testTaskCheckerHost() throws IOException {
+    final int maxTotalConcurrentTasks = 100;
+    final int maxConcurrentTasksPerServer = 2;
+    final int maxConcurrentTasksPerRegion = 1;
+    final AtomicLong tasksInProgress = new AtomicLong(0);
+    final Map<ServerName, AtomicInteger> taskCounterPerServer = new 
HashMap<>();
+    final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
+    TaskCountChecker countChecker = new TaskCountChecker(
+      maxTotalConcurrentTasks,
+      maxConcurrentTasksPerServer,
+      maxConcurrentTasksPerRegion,
+      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
+    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
+    // unlimiited
+    RequestSizeChecker sizeChecker = new 
RequestSizeChecker(maxHeapSizePerRequest);
+    RowCheckerHost checkerHost = new 
RowCheckerHost(Arrays.asList(countChecker, sizeChecker));
+
+    ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, 
maxHeapSizePerRequest);
+    assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code);
+
+    ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, 
maxHeapSizePerRequest);
+    // rejected for size
+    assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2);
+
+    ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, 
maxHeapSizePerRequest);
+    // rejected for size
+    assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code);
+
+    // fill the task slots for loc3.
+    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new 
AtomicInteger(100));
+    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
+
+    ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L);
+    // rejected for count
+    assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code);
+
+    // release the task slots for loc3.
+    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new 
AtomicInteger(0));
+    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
+
+    ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L);
+    assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2);
+  }
+
+  @Test
+  public void testRequestSizeCheckerr() throws IOException {
+    final long maxHeapSizePerRequest = 2 * 1024 * 1024;
+    final ClusterConnection conn = createHConnection();
+    RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest);
+
+    // inner state is unchanged.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+      code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+    }
+
+    // accept the data located on loc1 region.
+    ReturnCode acceptCode = checker.canTakeOperation(loc1, 
maxHeapSizePerRequest);
+    assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode);
+    checker.notifyFinal(acceptCode, loc1, maxHeapSizePerRequest);
+
+    // the sn server reachs the limit.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
+      assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
+      code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
+      assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
+    }
+
+    // the request to sn2 server should be accepted.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+    }
+
+    checker.reset();
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+      code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+    }
+  }
+
+  @Test
+  public void testSubmittedSizeChecker() {
+    final long maxHeapSizeSubmit = 2 * 1024 * 1024;
+    SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit);
+
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(loc1, 100000);
+      assertEquals(ReturnCode.INCLUDE, include);
+    }
+
+    for (int i = 0; i != 10; ++i) {
+      checker.notifyFinal(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit);
+    }
+
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(loc1, 100000);
+      assertEquals(ReturnCode.END, include);
+    }
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(loc2, 100000);
+      assertEquals(ReturnCode.END, include);
+    }
+    checker.reset();
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode include = checker.canTakeOperation(loc1, 100000);
+      assertEquals(ReturnCode.INCLUDE, include);
+    }
+  }
+  @Test
+  public void testTaskCountChecker() throws InterruptedIOException {
+    long rowSize = 12345;
+    int maxTotalConcurrentTasks = 100;
+    int maxConcurrentTasksPerServer = 2;
+    int maxConcurrentTasksPerRegion = 1;
+    AtomicLong tasksInProgress = new AtomicLong(0);
+    Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
+    Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
+    TaskCountChecker checker = new TaskCountChecker(
+      maxTotalConcurrentTasks,
+      maxConcurrentTasksPerServer,
+      maxConcurrentTasksPerRegion,
+      tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
+
+    // inner state is unchanged.
+    for (int i = 0; i != 10; ++i) {
+      ReturnCode code = checker.canTakeOperation(loc1, rowSize);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+    }
+    // add loc1 region.
+    ReturnCode code = checker.canTakeOperation(loc1, rowSize);
+    assertEquals(RowChecker.ReturnCode.INCLUDE, code);
+    checker.notifyFinal(code, loc1, rowSize);
+
+    // fill the task slots for loc1.
+    taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new 
AtomicInteger(100));
+    taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100));
+
+    // the region was previously accepted, so it must be accpted now.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
+      checker.notifyFinal(includeCode, loc1, rowSize);
+    }
+
+    // fill the task slots for loc3.
+    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new 
AtomicInteger(100));
+    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
+
+    // no task slots.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize);
+      assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode);
+      checker.notifyFinal(excludeCode, loc3, rowSize);
+    }
+
+    // release the tasks for loc3.
+    taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new 
AtomicInteger(0));
+    taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
+
+    // add loc3 region.
+    ReturnCode code3 = checker.canTakeOperation(loc3, rowSize);
+    assertEquals(RowChecker.ReturnCode.INCLUDE, code3);
+    checker.notifyFinal(code3, loc3, rowSize);
+
+    // the region was previously accepted, so it must be accpted now.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize);
+      assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
+      checker.notifyFinal(includeCode, loc3, rowSize);
+    }
+
+    checker.reset();
+    // the region was previously accepted,
+    // but checker have reseted and task slots for loc1 is full.
+    // So it must be rejected now.
+    for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
+      ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
+      assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
+      checker.notifyFinal(includeCode, loc1, rowSize);
+    }
+  }
+
+  @Test
   public void testBatch() throws IOException, InterruptedException {
     ClusterConnection conn = new MyConnectionImpl(conf);
     HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
@@ -818,7 +1241,6 @@ public class TestAsyncProcess {
     Assert.assertEquals(res[5], success);
     Assert.assertEquals(res[6], failure);
   }
-
   @Test
   public void testErrorsServers() throws IOException {
     Configuration configuration = new Configuration(conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c000f29e/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
index e771a92..e84b9e4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
@@ -69,7 +69,7 @@ import com.google.protobuf.Service;
  * mrmBuilder.addMutationRequest(m1);
  * mrmBuilder.addMutationRequest(m2);
  * CoprocessorRpcChannel channel = t.coprocessorService(ROW);
- * MultiRowMutationService.BlockingInterface service = 
+ * MultiRowMutationService.BlockingInterface service =
  *    MultiRowMutationService.newBlockingStub(channel);
  * MutateRowsRequest mrm = mrmBuilder.build();
  * service.mutateRows(null, mrm);

Reply via email to