This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 32fb393ef39 HBASE-29148: BufferedMutator should be able to flush after 
buffering a certain number of mutations (#6718)
32fb393ef39 is described below

commit 32fb393ef39883f1c6416ddd74e3110e4698dfa8
Author: Charles Connell <[email protected]>
AuthorDate: Mon Mar 3 11:02:22 2025 -0500

    HBASE-29148: BufferedMutator should be able to flush after buffering a 
certain number of mutations (#6718)
    
    Signed-off-by: Duo Zhang <[email protected]>
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 .../hadoop/hbase/client/AsyncBufferedMutator.java  |  5 ++
 .../hbase/client/AsyncBufferedMutatorBuilder.java  |  7 ++
 .../client/AsyncBufferedMutatorBuilderImpl.java    | 12 +++-
 .../hbase/client/AsyncBufferedMutatorImpl.java     | 19 +++++-
 .../hbase/client/AsyncConnectionConfiguration.java |  7 ++
 .../hadoop/hbase/client/BufferedMutator.java       |  8 +++
 .../hadoop/hbase/client/BufferedMutatorImpl.java   | 12 +++-
 .../hadoop/hbase/client/BufferedMutatorParams.java | 19 ++++++
 .../hbase/client/ConnectionConfiguration.java      | 12 ++++
 .../hbase/client/ConnectionImplementation.java     |  3 +
 .../hbase/client/TestBufferedMutatorParams.java    |  6 +-
 .../hbase/client/TestAsyncBufferMutator.java       | 24 ++++++-
 .../hadoop/hbase/client/TestBufferedMutator2.java  | 79 ++++++++++++++++++++++
 13 files changed, 204 insertions(+), 9 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index e5f28d2e060..27e11219c03 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -87,6 +87,11 @@ public interface AsyncBufferedMutator extends Closeable {
    */
   long getWriteBufferSize();
 
+  /**
+   * The maximum number of mutations that this buffered mutator will buffer 
before flushing them
+   */
+  int getMaxMutations();
+
   /**
    * Returns the periodical flush interval, 0 means disabled.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index ed21fb8e23e..11aac4bd1d3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -103,6 +103,13 @@ public interface AsyncBufferedMutatorBuilder {
    */
   AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
 
+  /**
+   * Set the maximum number of mutations that this buffered mutator will 
buffer before flushing
+   * them. If you are talking to a cluster that uses 
hbase.rpc.rows.size.threshold.reject to reject
+   * large Multi requests, you may need this setting to avoid rejections. 
Default is no limit.
+   */
+  AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations);
+
   /**
    * Create the {@link AsyncBufferedMutator} instance.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index ede5b359e83..698947c8754 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -39,12 +39,15 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
 
   private int maxKeyValueSize;
 
+  private int maxMutations;
+
   public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
     AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
     this.tableBuilder = tableBuilder;
     this.writeBufferSize = connConf.getWriteBufferSize();
     this.periodicFlushTimeoutNs = 
connConf.getWriteBufferPeriodicFlushTimeoutNs();
     this.maxKeyValueSize = connConf.getMaxKeyValueSize();
+    this.maxMutations = connConf.getBufferedMutatorMaxMutations();
     this.periodicalFlushTimer = periodicalFlushTimer;
   }
 
@@ -100,9 +103,16 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
     return this;
   }
 
+  @Override
+  public AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations) {
+    Preconditions.checkArgument(maxMutations > 0, "maxMutations %d must be > 
0", maxMutations);
+    this.maxMutations = maxMutations;
+    return this;
+  }
+
   @Override
   public AsyncBufferedMutator build() {
     return new AsyncBufferedMutatorImpl(periodicalFlushTimer, 
tableBuilder.build(), writeBufferSize,
-      periodicFlushTimeoutNs, maxKeyValueSize);
+      periodicFlushTimeoutNs, maxKeyValueSize, maxMutations);
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index ce4193d9138..9f74323785c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -31,6 +31,8 @@ import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -41,6 +43,8 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 @InterfaceAudience.Private
 class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
+
   private final HashedWheelTimer periodicalFlushTimer;
 
   private final AsyncTable<?> table;
@@ -51,6 +55,8 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
 
   private final int maxKeyValueSize;
 
+  private final int maxMutations;
+
   private List<Mutation> mutations = new ArrayList<>();
 
   private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -62,12 +68,13 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
   Timeout periodicFlushTask;
 
   AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
-    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
+    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, 
int maxMutations) {
     this.periodicalFlushTimer = periodicalFlushTimer;
     this.table = table;
     this.writeBufferSize = writeBufferSize;
     this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
     this.maxKeyValueSize = maxKeyValueSize;
+    this.maxMutations = maxMutations;
   }
 
   @Override
@@ -144,6 +151,10 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
       this.futures.addAll(futures);
       bufferedSize += heapSize;
       if (bufferedSize >= writeBufferSize) {
+        LOG.trace("Flushing because write buffer size {} reached", 
writeBufferSize);
+        internalFlush();
+      } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
+        LOG.trace("Flushing because max mutations {} reached", maxMutations);
         internalFlush();
       }
     }
@@ -170,4 +181,10 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
   public long getPeriodicalFlushTimeout(TimeUnit unit) {
     return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
   }
+
+  @Override
+  public int getMaxMutations() {
+    return maxMutations;
+  }
+
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 5fb95ebbd87..14bc0598d84 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -101,6 +101,8 @@ class AsyncConnectionConfiguration {
 
   private final int maxKeyValueSize;
 
+  private final int bufferedMutatorMaxMutations;
+
   AsyncConnectionConfiguration(Configuration conf) {
     ConnectionConfiguration connectionConf = new ConnectionConfiguration(conf);
 
@@ -111,6 +113,7 @@ class AsyncConnectionConfiguration {
     this.writeBufferPeriodicFlushTimeoutNs = 
connectionConf.getWriteBufferPeriodicFlushTimeoutMs();
     this.maxKeyValueSize = connectionConf.getMaxKeyValueSize();
     this.maxRetries = connectionConf.getRetriesNumber();
+    this.bufferedMutatorMaxMutations = 
connectionConf.getBufferedMutatorMaxMutations();
 
     // fields from connection configuration that need to be converted to nanos
     this.metaOperationTimeoutNs =
@@ -229,4 +232,8 @@ class AsyncConnectionConfiguration {
   int getMaxKeyValueSize() {
     return maxKeyValueSize;
   }
+
+  int getBufferedMutatorMaxMutations() {
+    return bufferedMutatorMaxMutations;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 7c95643ecc9..84a39ec3802 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -194,6 +194,14 @@ public interface BufferedMutator extends Closeable {
       "The BufferedMutator::setOperationTimeout has not been implemented");
   }
 
+  /**
+   * The maximum number of mutations that this buffered mutator will buffer 
before flushing them
+   */
+  default int getMaxMutations() {
+    throw new UnsupportedOperationException(
+      "The BufferedMutator::getMaxMutations has not been implemented");
+  }
+
   /**
    * Listens for asynchronous exceptions on a {@link BufferedMutator}.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index e0795c7c4c6..98ce2dfa625 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -86,6 +86,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private Timer writeBufferPeriodicFlushTimer = null;
 
   private final int maxKeyValueSize;
+  private final int maxMutations;
   private final ExecutorService pool;
   private final AtomicInteger rpcTimeout;
   private final AtomicInteger operationTimeout;
@@ -128,6 +129,10 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
       ? params.getMaxKeyValueSize()
       : tableConf.getMaxKeyValueSize();
 
+    this.maxMutations = params.getMaxMutations() != UNSET
+      ? params.getMaxMutations()
+      : conn.getConnectionConfiguration().getBufferedMutatorMaxMutations();
+
     this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != UNSET
       ? params.getRpcTimeout()
       : conn.getConnectionConfiguration().getWriteRpcTimeout());
@@ -280,8 +285,11 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
     throws InterruptedIOException, RetriesExhaustedWithDetailsException {
     List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
     while (true) {
-      if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
-        // There is the room to accept more mutations.
+      if (
+        !flushAll && (currentWriteBufferSize.get() <= writeBufferSize)
+          && (maxMutations == UNSET || size() < maxMutations)
+      ) {
+        // There is room to accept more mutations.
         break;
       }
       AsyncRequestFuture asf;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index ee614716f86..ecc7d6e0031 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -38,6 +38,7 @@ public class BufferedMutatorParams implements Cloneable {
   private String implementationClassName = null;
   private int rpcTimeout = UNSET;
   private int operationTimeout = UNSET;
+  private int maxMutations = UNSET;
   private BufferedMutator.ExceptionListener listener = new 
BufferedMutator.ExceptionListener() {
     @Override
     public void onException(RetriesExhaustedWithDetailsException exception,
@@ -85,6 +86,23 @@ public class BufferedMutatorParams implements Cloneable {
     return operationTimeout;
   }
 
+  /**
+   * Set the maximum number of mutations that this buffered mutator will 
buffer before flushing
+   * them. If you are talking to a cluster that uses 
hbase.rpc.rows.size.threshold.reject to reject
+   * large Multi requests, you may need this setting to avoid rejections. 
Default is no limit.
+   */
+  public BufferedMutatorParams setMaxMutations(int maxMutations) {
+    this.maxMutations = maxMutations;
+    return this;
+  }
+
+  /**
+   * The maximum number of mutations that this buffered mutator will buffer 
before flushing them
+   */
+  public int getMaxMutations() {
+    return maxMutations;
+  }
+
   /**
    * Override the write buffer size specified by the provided {@link 
Connection}'s
    * {@link org.apache.hadoop.conf.Configuration} instance, via the 
configuration key
@@ -188,6 +206,7 @@ public class BufferedMutatorParams implements Cloneable {
     clone.writeBufferPeriodicFlushTimeoutMs = 
this.writeBufferPeriodicFlushTimeoutMs;
     clone.writeBufferPeriodicFlushTimerTickMs = 
this.writeBufferPeriodicFlushTimerTickMs;
     clone.maxKeyValueSize = this.maxKeyValueSize;
+    clone.maxMutations = this.maxMutations;
     clone.pool = this.pool;
     clone.listener = this.listener;
     clone.implementationClassName = this.implementationClassName;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 93fa2600d89..be118857fc6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -47,6 +47,9 @@ public class ConnectionConfiguration {
   public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 
1000L; // 1 second
   public static final String MAX_KEYVALUE_SIZE_KEY = 
"hbase.client.keyvalue.maxsize";
   public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+  public static final String BUFFERED_MUTATOR_MAX_MUTATIONS_KEY =
+    "hbase.client.write.buffer.maxmutations";
+  public static final int BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT = -1;
   public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
     "hbase.client.primaryCallTimeout.get";
   public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 
10ms
@@ -88,6 +91,7 @@ public class ConnectionConfiguration {
   private final int metaReplicaCallTimeoutMicroSecondScan;
   private final int retries;
   private final int maxKeyValueSize;
+  private final int bufferedMutatorMaxMutations;
   private final int rpcTimeout;
   private final int readRpcTimeout;
   private final int metaReadRpcTimeout;
@@ -143,6 +147,9 @@ public class ConnectionConfiguration {
 
     this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, 
MAX_KEYVALUE_SIZE_DEFAULT);
 
+    this.bufferedMutatorMaxMutations =
+      conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, 
BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
+
     this.rpcTimeout =
       conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 
@@ -193,6 +200,7 @@ public class ConnectionConfiguration {
     this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
     this.clientScannerAsyncPrefetch = 
Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
     this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+    this.bufferedMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
     this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
     this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
     this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
@@ -259,6 +267,10 @@ public class ConnectionConfiguration {
     return maxKeyValueSize;
   }
 
+  public int getBufferedMutatorMaxMutations() {
+    return bufferedMutatorMaxMutations;
+  }
+
   public long getScannerMaxResultSize() {
     return scannerMaxResultSize;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index eef82643b5b..a85753fcc79 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -485,6 +485,9 @@ public class ConnectionImplementation implements 
ClusterConnection, Closeable {
     if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
       params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
     }
+    if (params.getMaxMutations() == BufferedMutatorParams.UNSET) {
+      
params.setMaxMutations(connectionConfig.getBufferedMutatorMaxMutations());
+    }
     // Look to see if an alternate BufferedMutation implementation is wanted.
     // Look in params and in config. If null, use default.
     String implementationClassName = params.getImplementationClassName();
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
index ba23d105393..fdc7c305500 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -140,8 +140,8 @@ public class TestBufferedMutatorParams {
 
     BufferedMutator.ExceptionListener listener = new MockExceptionListener();
     bmp.writeBufferSize(17).setWriteBufferPeriodicFlushTimeoutMs(123)
-      
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).pool(pool)
-      .listener(listener);
+      
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).setMaxMutations(3737)
+      .pool(pool).listener(listener);
     bmp.implementationClassName("someClassName");
     BufferedMutatorParams clone = bmp.clone();
 
@@ -151,6 +151,7 @@ public class TestBufferedMutatorParams {
     assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
     assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
     assertEquals(13, clone.getMaxKeyValueSize());
+    assertEquals(3737, clone.getMaxMutations());
     assertEquals("someClassName", clone.getImplementationClassName());
 
     cloneTest(bmp, clone);
@@ -178,6 +179,7 @@ public class TestBufferedMutatorParams {
     assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
       clone.getWriteBufferPeriodicFlushTimerTickMs());
     assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
+    assertTrue(some.getMaxMutations() == clone.getMaxMutations());
     assertTrue(some.getListener() == clone.getListener());
     assertTrue(some.getPool() == clone.getPool());
     assertEquals(some.getImplementationClassName(), 
clone.getImplementationClassName());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index b479d4de573..2802c77b5dd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -175,6 +175,23 @@ public class TestAsyncBufferMutator {
     assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
   }
 
+  @Test
+  public void testMaxMutationsFlush() throws InterruptedException, 
ExecutionException {
+    AsyncBufferedMutator mutator =
+      CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
+    CompletableFuture<?> future1 =
+      mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+    CompletableFuture<?> future2 =
+      mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
+    CompletableFuture<?> future3 =
+      mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
+    CompletableFuture.allOf(future1, future2, future3).join();
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+    assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
+    assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
+  }
+
   // a bit deep into the implementation
   @Test
   public void testCancelPeriodicFlush() throws InterruptedException, 
ExecutionException {
@@ -244,8 +261,9 @@ public class TestAsyncBufferMutator {
     private int flushCount;
 
     AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
-      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
-      super(periodicalFlushTimer, table, writeBufferSize, 
periodicFlushTimeoutNs, maxKeyValueSize);
+      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, 
int maxMutation) {
+      super(periodicalFlushTimer, table, writeBufferSize, 
periodicFlushTimeoutNs, maxKeyValueSize,
+        maxMutation);
     }
 
     @Override
@@ -261,7 +279,7 @@ public class TestAsyncBufferMutator {
     Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
     try (AsyncBufferMutatorForTest mutator =
       new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, 
CONN.getTable(TABLE_NAME),
-        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) 
{
+        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 
100)) {
       CompletableFuture<?> future = mutator.mutate(put);
       Timeout task = mutator.periodicFlushTask;
       // we should have scheduled a periodic flush task
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator2.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator2.java
new file mode 100644
index 00000000000..2dd2057b8c0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator2.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBufferedMutator2 {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBufferedMutator2.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("example-table");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+  private static byte[] CQ = Bytes.toBytes("cq");
+  private static byte[] VALUE = new byte[1024];
+
+  private static Connection CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, CF);
+    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    Bytes.random(VALUE);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMaxMutationsFlush() throws IOException {
+    BufferedMutator mutator =
+      CONN.getBufferedMutator(new 
BufferedMutatorParams(TABLE_NAME).setMaxMutations(3));
+    mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+    mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
+    mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
+    Table table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).getValue(CF, 
CQ));
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).getValue(CF, 
CQ));
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).getValue(CF, 
CQ));
+  }
+
+}

Reply via email to