This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 1ae4fb42c78 HBASE-29148: BufferedMutator should be able to flush after 
buffering a certain number of mutations (#6718)
1ae4fb42c78 is described below

commit 1ae4fb42c785da3cec4d90db71f44098be4cf0df
Author: Charles Connell <[email protected]>
AuthorDate: Thu Feb 27 09:49:11 2025 -0500

    HBASE-29148: BufferedMutator should be able to flush after buffering a 
certain number of mutations (#6718)
    
    Signed-off-by: Duo Zhang <[email protected]>
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 .../hadoop/hbase/client/AsyncBufferedMutator.java  |  5 +++++
 .../hbase/client/AsyncBufferedMutatorBuilder.java  |  7 +++++++
 .../client/AsyncBufferedMutatorBuilderImpl.java    | 12 ++++++++++-
 .../hbase/client/AsyncBufferedMutatorImpl.java     | 18 +++++++++++++++-
 .../hbase/client/AsyncConnectionConfiguration.java | 10 +++++++++
 .../hadoop/hbase/client/BufferedMutator.java       |  8 ++++++++
 .../hadoop/hbase/client/BufferedMutatorParams.java | 19 +++++++++++++++++
 .../hbase/client/ConnectionConfiguration.java      | 12 +++++++++++
 .../client/ConnectionOverAsyncConnection.java      |  3 +++
 .../hbase/client/TestBufferedMutatorParams.java    |  6 ++++--
 .../hbase/client/TestAsyncBufferMutator.java       | 24 +++++++++++++++++++---
 11 files changed, 117 insertions(+), 7 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index 6cc2b5adf9d..479446f8ea1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -88,6 +88,11 @@ public interface AsyncBufferedMutator extends Closeable {
    */
   long getWriteBufferSize();
 
+  /**
+   * The maximum number of mutations that this buffered mutator will buffer 
before flushing them
+   */
+  int getMaxMutations();
+
   /**
    * Returns the periodical flush interval, 0 means disabled.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index d38aa625fb2..57c609ebb03 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -114,6 +114,13 @@ public interface AsyncBufferedMutatorBuilder {
    */
   AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
 
+  /**
+   * Set the maximum number of mutations that this buffered mutator will 
buffer before flushing
+   * them. If you are talking to a cluster that uses 
hbase.rpc.rows.size.threshold.reject to reject
+   * large Multi requests, you may need this setting to avoid rejections. 
Default is no limit.
+   */
+  AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations);
+
   /**
    * Create the {@link AsyncBufferedMutator} instance.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 6905ff3065c..7fa860dc3d4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -40,12 +40,15 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
 
   private int maxKeyValueSize;
 
+  private int maxMutations;
+
   public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
     AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
     this.tableBuilder = tableBuilder;
     this.writeBufferSize = connConf.getWriteBufferSize();
     this.periodicFlushTimeoutNs = 
connConf.getWriteBufferPeriodicFlushTimeoutNs();
     this.maxKeyValueSize = connConf.getMaxKeyValueSize();
+    this.maxMutations = connConf.getBufferedMutatorMaxMutations();
     this.periodicalFlushTimer = periodicalFlushTimer;
   }
 
@@ -115,9 +118,16 @@ class AsyncBufferedMutatorBuilderImpl implements 
AsyncBufferedMutatorBuilder {
     return this;
   }
 
+  @Override
+  public AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations) {
+    Preconditions.checkArgument(maxMutations > 0, "maxMutations %d must be > 
0", maxMutations);
+    this.maxMutations = maxMutations;
+    return this;
+  }
+
   @Override
   public AsyncBufferedMutator build() {
     return new AsyncBufferedMutatorImpl(periodicalFlushTimer, 
tableBuilder.build(), writeBufferSize,
-      periodicFlushTimeoutNs, maxKeyValueSize);
+      periodicFlushTimeoutNs, maxKeyValueSize, maxMutations);
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 3acd8bebdad..0e7e1b91e44 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -32,6 +32,8 @@ import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -42,6 +44,8 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 @InterfaceAudience.Private
 class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
+
   private final HashedWheelTimer periodicalFlushTimer;
 
   private final AsyncTable<?> table;
@@ -52,6 +56,8 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
 
   private final int maxKeyValueSize;
 
+  private final int maxMutations;
+
   private List<Mutation> mutations = new ArrayList<>();
 
   private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -63,12 +69,13 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
   Timeout periodicFlushTask;
 
   AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
-    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
+    long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, 
int maxMutations) {
     this.periodicalFlushTimer = periodicalFlushTimer;
     this.table = table;
     this.writeBufferSize = writeBufferSize;
     this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
     this.maxKeyValueSize = maxKeyValueSize;
+    this.maxMutations = maxMutations;
   }
 
   @Override
@@ -145,6 +152,10 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
       this.futures.addAll(futures);
       bufferedSize += heapSize;
       if (bufferedSize >= writeBufferSize) {
+        LOG.trace("Flushing because write buffer size {} reached", 
writeBufferSize);
+        internalFlush();
+      } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
+        LOG.trace("Flushing because max mutations {} reached", maxMutations);
         internalFlush();
       }
     }
@@ -172,6 +183,11 @@ class AsyncBufferedMutatorImpl implements 
AsyncBufferedMutator {
     return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
+  @Override
+  public int getMaxMutations() {
+    return maxMutations;
+  }
+
   @Override
   public Map<String, byte[]> getRequestAttributes() {
     return table.getRequestAttributes();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 5dc9f6d3b41..05fe89ae237 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -38,6 +38,8 @@ import static 
org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_KEY;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
 import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
@@ -148,6 +150,8 @@ class AsyncConnectionConfiguration {
 
   private final int maxKeyValueSize;
 
+  private final int bufferedMutatorMaxMutations;
+
   AsyncConnectionConfiguration(Configuration conf) {
     long operationTimeoutMs =
       conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, 
DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@@ -200,6 +204,8 @@ class AsyncConnectionConfiguration {
       
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
         HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
     this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, 
MAX_KEYVALUE_SIZE_DEFAULT);
+    this.bufferedMutatorMaxMutations =
+      conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, 
BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
   }
 
   long getMetaOperationTimeoutNs() {
@@ -285,4 +291,8 @@ class AsyncConnectionConfiguration {
   int getMaxKeyValueSize() {
     return maxKeyValueSize;
   }
+
+  int getBufferedMutatorMaxMutations() {
+    return bufferedMutatorMaxMutations;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 24563367bbb..63d31cee088 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -213,6 +213,14 @@ public interface BufferedMutator extends Closeable {
     return Collections.emptyMap();
   }
 
+  /**
+   * The maximum number of mutations that this buffered mutator will buffer 
before flushing them
+   */
+  default int getMaxMutations() {
+    throw new UnsupportedOperationException(
+      "The BufferedMutator::getMaxMutations has not been implemented");
+  }
+
   /**
    * Listens for asynchronous exceptions on a {@link BufferedMutator}.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 44bc5e2be7c..88cc062b5bc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -41,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
   private String implementationClassName = null;
   private int rpcTimeout = UNSET;
   private int operationTimeout = UNSET;
+  private int maxMutations = UNSET;
   protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
   private BufferedMutator.ExceptionListener listener = new 
BufferedMutator.ExceptionListener() {
     @Override
@@ -89,6 +90,23 @@ public class BufferedMutatorParams implements Cloneable {
     return operationTimeout;
   }
 
+  /**
+   * Set the maximum number of mutations that this buffered mutator will 
buffer before flushing
+   * them. If you are talking to a cluster that uses 
hbase.rpc.rows.size.threshold.reject to reject
+   * large Multi requests, you may need this setting to avoid rejections. 
Default is no limit.
+   */
+  public BufferedMutatorParams setMaxMutations(int maxMutations) {
+    this.maxMutations = maxMutations;
+    return this;
+  }
+
+  /**
+   * The maximum number of mutations that this buffered mutator will buffer 
before flushing them
+   */
+  public int getMaxMutations() {
+    return maxMutations;
+  }
+
   public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
     if (requestAttributes.isEmpty()) {
       requestAttributes = new HashMap<>();
@@ -222,6 +240,7 @@ public class BufferedMutatorParams implements Cloneable {
     clone.writeBufferPeriodicFlushTimeoutMs = 
this.writeBufferPeriodicFlushTimeoutMs;
     clone.writeBufferPeriodicFlushTimerTickMs = 
this.writeBufferPeriodicFlushTimerTickMs;
     clone.maxKeyValueSize = this.maxKeyValueSize;
+    clone.maxMutations = this.maxMutations;
     clone.pool = this.pool;
     clone.listener = this.listener;
     clone.implementationClassName = this.implementationClassName;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 56f3a65c957..068f0e459a2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -41,6 +41,9 @@ public class ConnectionConfiguration {
   public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 
1000L; // 1 second
   public static final String MAX_KEYVALUE_SIZE_KEY = 
"hbase.client.keyvalue.maxsize";
   public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+  public static final String BUFFERED_MUTATOR_MAX_MUTATIONS_KEY =
+    "hbase.client.write.buffer.maxmutations";
+  public static final int BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT = -1;
   public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
     "hbase.client.primaryCallTimeout.get";
   public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 
10ms
@@ -66,6 +69,7 @@ public class ConnectionConfiguration {
   private final int metaReplicaCallTimeoutMicroSecondScan;
   private final int retries;
   private final int maxKeyValueSize;
+  private final int bufferedMutatorMaxMutations;
   private final int rpcTimeout;
   private final int readRpcTimeout;
   private final int metaReadRpcTimeout;
@@ -117,6 +121,9 @@ public class ConnectionConfiguration {
 
     this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, 
MAX_KEYVALUE_SIZE_DEFAULT);
 
+    this.bufferedMutatorMaxMutations =
+      conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, 
BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
+
     this.rpcTimeout =
       conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 
@@ -148,6 +155,7 @@ public class ConnectionConfiguration {
     this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
     this.clientScannerAsyncPrefetch = 
Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
     this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+    this.bufferedMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
     this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
     this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
     this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
@@ -210,6 +218,10 @@ public class ConnectionConfiguration {
     return maxKeyValueSize;
   }
 
+  public int getBufferedMutatorMaxMutations() {
+    return bufferedMutatorMaxMutations;
+  }
+
   public long getScannerMaxResultSize() {
     return scannerMaxResultSize;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index d299e453266..471cfa87445 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -107,6 +107,9 @@ class ConnectionOverAsyncConnection implements Connection {
     if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
       builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
     }
+    if (params.getMaxMutations() != BufferedMutatorParams.UNSET) {
+      builder.setMaxMutations(params.getMaxMutations());
+    }
     if (!params.getRequestAttributes().isEmpty()) {
 
       builder.setRequestAttributes(params.getRequestAttributes());
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
index ba23d105393..fdc7c305500 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -140,8 +140,8 @@ public class TestBufferedMutatorParams {
 
     BufferedMutator.ExceptionListener listener = new MockExceptionListener();
     bmp.writeBufferSize(17).setWriteBufferPeriodicFlushTimeoutMs(123)
-      
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).pool(pool)
-      .listener(listener);
+      
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).setMaxMutations(3737)
+      .pool(pool).listener(listener);
     bmp.implementationClassName("someClassName");
     BufferedMutatorParams clone = bmp.clone();
 
@@ -151,6 +151,7 @@ public class TestBufferedMutatorParams {
     assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
     assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
     assertEquals(13, clone.getMaxKeyValueSize());
+    assertEquals(3737, clone.getMaxMutations());
     assertEquals("someClassName", clone.getImplementationClassName());
 
     cloneTest(bmp, clone);
@@ -178,6 +179,7 @@ public class TestBufferedMutatorParams {
     assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
       clone.getWriteBufferPeriodicFlushTimerTickMs());
     assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
+    assertTrue(some.getMaxMutations() == clone.getMaxMutations());
     assertTrue(some.getListener() == clone.getListener());
     assertTrue(some.getPool() == clone.getPool());
     assertEquals(some.getImplementationClassName(), 
clone.getImplementationClassName());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index acfa25fecac..08e68def2c4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -175,6 +175,23 @@ public class TestAsyncBufferMutator {
     assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
   }
 
+  @Test
+  public void testMaxMutationsFlush() throws InterruptedException, 
ExecutionException {
+    AsyncBufferedMutator mutator =
+      CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
+    CompletableFuture<?> future1 =
+      mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+    CompletableFuture<?> future2 =
+      mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
+    CompletableFuture<?> future3 =
+      mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
+    CompletableFuture.allOf(future1, future2, future3).join();
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+    assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
+    assertArrayEquals(VALUE, table.get(new 
Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
+  }
+
   // a bit deep into the implementation
   @Test
   public void testCancelPeriodicFlush() throws InterruptedException, 
ExecutionException {
@@ -244,8 +261,9 @@ public class TestAsyncBufferMutator {
     private int flushCount;
 
     AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, 
AsyncTable<?> table,
-      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
-      super(periodicalFlushTimer, table, writeBufferSize, 
periodicFlushTimeoutNs, maxKeyValueSize);
+      long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, 
int maxMutation) {
+      super(periodicalFlushTimer, table, writeBufferSize, 
periodicFlushTimeoutNs, maxKeyValueSize,
+        maxMutation);
     }
 
     @Override
@@ -261,7 +279,7 @@ public class TestAsyncBufferMutator {
     Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
     try (AsyncBufferMutatorForTest mutator =
       new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, 
CONN.getTable(TABLE_NAME),
-        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) 
{
+        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 
100)) {
       CompletableFuture<?> future = mutator.mutate(put);
       Timeout task = mutator.periodicFlushTask;
       // we should have scheduled a periodic flush task

Reply via email to