This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 1ae4fb42c78 HBASE-29148: BufferedMutator should be able to flush after
buffering a certain number of mutations (#6718)
1ae4fb42c78 is described below
commit 1ae4fb42c785da3cec4d90db71f44098be4cf0df
Author: Charles Connell <[email protected]>
AuthorDate: Thu Feb 27 09:49:11 2025 -0500
HBASE-29148: BufferedMutator should be able to flush after buffering a
certain number of mutations (#6718)
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
---
.../hadoop/hbase/client/AsyncBufferedMutator.java | 5 +++++
.../hbase/client/AsyncBufferedMutatorBuilder.java | 7 +++++++
.../client/AsyncBufferedMutatorBuilderImpl.java | 12 ++++++++++-
.../hbase/client/AsyncBufferedMutatorImpl.java | 18 +++++++++++++++-
.../hbase/client/AsyncConnectionConfiguration.java | 10 +++++++++
.../hadoop/hbase/client/BufferedMutator.java | 8 ++++++++
.../hadoop/hbase/client/BufferedMutatorParams.java | 19 +++++++++++++++++
.../hbase/client/ConnectionConfiguration.java | 12 +++++++++++
.../client/ConnectionOverAsyncConnection.java | 3 +++
.../hbase/client/TestBufferedMutatorParams.java | 6 ++++--
.../hbase/client/TestAsyncBufferMutator.java | 24 +++++++++++++++++++---
11 files changed, 117 insertions(+), 7 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index 6cc2b5adf9d..479446f8ea1 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -88,6 +88,11 @@ public interface AsyncBufferedMutator extends Closeable {
*/
long getWriteBufferSize();
+ /**
+ * The maximum number of mutations that this buffered mutator will buffer
before flushing them
+ */
+ int getMaxMutations();
+
/**
* Returns the periodical flush interval, 0 means disabled.
*/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index d38aa625fb2..57c609ebb03 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -114,6 +114,13 @@ public interface AsyncBufferedMutatorBuilder {
*/
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
+ /**
+ * Set the maximum number of mutations that this buffered mutator will
buffer before flushing
+ * them. If you are talking to a cluster that uses
hbase.rpc.rows.size.threshold.reject to reject
+ * large Multi requests, you may need this setting to avoid rejections.
Default is no limit.
+ */
+ AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations);
+
/**
* Create the {@link AsyncBufferedMutator} instance.
*/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 6905ff3065c..7fa860dc3d4 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -40,12 +40,15 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
private int maxKeyValueSize;
+ private int maxMutations;
+
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
this.periodicFlushTimeoutNs =
connConf.getWriteBufferPeriodicFlushTimeoutNs();
this.maxKeyValueSize = connConf.getMaxKeyValueSize();
+ this.maxMutations = connConf.getBufferedMutatorMaxMutations();
this.periodicalFlushTimer = periodicalFlushTimer;
}
@@ -115,9 +118,16 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
return this;
}
+ @Override
+ public AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations) {
+ Preconditions.checkArgument(maxMutations > 0, "maxMutations %d must be >
0", maxMutations);
+ this.maxMutations = maxMutations;
+ return this;
+ }
+
@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(periodicalFlushTimer,
tableBuilder.build(), writeBufferSize,
- periodicFlushTimeoutNs, maxKeyValueSize);
+ periodicFlushTimeoutNs, maxKeyValueSize, maxMutations);
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 3acd8bebdad..0e7e1b91e44 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -32,6 +32,8 @@ import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -42,6 +44,8 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
+
private final HashedWheelTimer periodicalFlushTimer;
private final AsyncTable<?> table;
@@ -52,6 +56,8 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
private final int maxKeyValueSize;
+ private final int maxMutations;
+
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -63,12 +69,13 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
Timeout periodicFlushTask;
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
- long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
+ long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize,
int maxMutations) {
this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
this.maxKeyValueSize = maxKeyValueSize;
+ this.maxMutations = maxMutations;
}
@Override
@@ -145,6 +152,10 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
this.futures.addAll(futures);
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
+ LOG.trace("Flushing because write buffer size {} reached",
writeBufferSize);
+ internalFlush();
+ } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
+ LOG.trace("Flushing because max mutations {} reached", maxMutations);
internalFlush();
}
}
@@ -172,6 +183,11 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}
+ @Override
+ public int getMaxMutations() {
+ return maxMutations;
+ }
+
@Override
public Map<String, byte[]> getRequestAttributes() {
return table.getRequestAttributes();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 5dc9f6d3b41..05fe89ae237 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -38,6 +38,8 @@ import static
org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
+import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
+import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_KEY;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
@@ -148,6 +150,8 @@ class AsyncConnectionConfiguration {
private final int maxKeyValueSize;
+ private final int bufferedMutatorMaxMutations;
+
AsyncConnectionConfiguration(Configuration conf) {
long operationTimeoutMs =
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT,
DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@@ -200,6 +204,8 @@ class AsyncConnectionConfiguration {
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY,
MAX_KEYVALUE_SIZE_DEFAULT);
+ this.bufferedMutatorMaxMutations =
+ conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY,
BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
}
long getMetaOperationTimeoutNs() {
@@ -285,4 +291,8 @@ class AsyncConnectionConfiguration {
int getMaxKeyValueSize() {
return maxKeyValueSize;
}
+
+ int getBufferedMutatorMaxMutations() {
+ return bufferedMutatorMaxMutations;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 24563367bbb..63d31cee088 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -213,6 +213,14 @@ public interface BufferedMutator extends Closeable {
return Collections.emptyMap();
}
+ /**
+ * The maximum number of mutations that this buffered mutator will buffer
before flushing them
+ */
+ default int getMaxMutations() {
+ throw new UnsupportedOperationException(
+ "The BufferedMutator::getMaxMutations has not been implemented");
+ }
+
/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 44bc5e2be7c..88cc062b5bc 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -41,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
private String implementationClassName = null;
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
+ private int maxMutations = UNSET;
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
private BufferedMutator.ExceptionListener listener = new
BufferedMutator.ExceptionListener() {
@Override
@@ -89,6 +90,23 @@ public class BufferedMutatorParams implements Cloneable {
return operationTimeout;
}
+ /**
+ * Set the maximum number of mutations that this buffered mutator will
buffer before flushing
+ * them. If you are talking to a cluster that uses
hbase.rpc.rows.size.threshold.reject to reject
+ * large Multi requests, you may need this setting to avoid rejections.
Default is no limit.
+ */
+ public BufferedMutatorParams setMaxMutations(int maxMutations) {
+ this.maxMutations = maxMutations;
+ return this;
+ }
+
+ /**
+ * The maximum number of mutations that this buffered mutator will buffer
before flushing them
+ */
+ public int getMaxMutations() {
+ return maxMutations;
+ }
+
public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
@@ -222,6 +240,7 @@ public class BufferedMutatorParams implements Cloneable {
clone.writeBufferPeriodicFlushTimeoutMs =
this.writeBufferPeriodicFlushTimeoutMs;
clone.writeBufferPeriodicFlushTimerTickMs =
this.writeBufferPeriodicFlushTimerTickMs;
clone.maxKeyValueSize = this.maxKeyValueSize;
+ clone.maxMutations = this.maxMutations;
clone.pool = this.pool;
clone.listener = this.listener;
clone.implementationClassName = this.implementationClassName;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 56f3a65c957..068f0e459a2 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -41,6 +41,9 @@ public class ConnectionConfiguration {
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT =
1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY =
"hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+ public static final String BUFFERED_MUTATOR_MAX_MUTATIONS_KEY =
+ "hbase.client.write.buffer.maxmutations";
+ public static final int BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT = -1;
public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
"hbase.client.primaryCallTimeout.get";
public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; //
10ms
@@ -66,6 +69,7 @@ public class ConnectionConfiguration {
private final int metaReplicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
+ private final int bufferedMutatorMaxMutations;
private final int rpcTimeout;
private final int readRpcTimeout;
private final int metaReadRpcTimeout;
@@ -117,6 +121,9 @@ public class ConnectionConfiguration {
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY,
MAX_KEYVALUE_SIZE_DEFAULT);
+ this.bufferedMutatorMaxMutations =
+ conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY,
BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
+
this.rpcTimeout =
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@@ -148,6 +155,7 @@ public class ConnectionConfiguration {
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.clientScannerAsyncPrefetch =
Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+ this.bufferedMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
@@ -210,6 +218,10 @@ public class ConnectionConfiguration {
return maxKeyValueSize;
}
+ public int getBufferedMutatorMaxMutations() {
+ return bufferedMutatorMaxMutations;
+ }
+
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index d299e453266..471cfa87445 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -107,6 +107,9 @@ class ConnectionOverAsyncConnection implements Connection {
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
}
+ if (params.getMaxMutations() != BufferedMutatorParams.UNSET) {
+ builder.setMaxMutations(params.getMaxMutations());
+ }
if (!params.getRequestAttributes().isEmpty()) {
builder.setRequestAttributes(params.getRequestAttributes());
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
index ba23d105393..fdc7c305500 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -140,8 +140,8 @@ public class TestBufferedMutatorParams {
BufferedMutator.ExceptionListener listener = new MockExceptionListener();
bmp.writeBufferSize(17).setWriteBufferPeriodicFlushTimeoutMs(123)
-
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).pool(pool)
- .listener(listener);
+
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).setMaxMutations(3737)
+ .pool(pool).listener(listener);
bmp.implementationClassName("someClassName");
BufferedMutatorParams clone = bmp.clone();
@@ -151,6 +151,7 @@ public class TestBufferedMutatorParams {
assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(13, clone.getMaxKeyValueSize());
+ assertEquals(3737, clone.getMaxMutations());
assertEquals("someClassName", clone.getImplementationClassName());
cloneTest(bmp, clone);
@@ -178,6 +179,7 @@ public class TestBufferedMutatorParams {
assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
+ assertTrue(some.getMaxMutations() == clone.getMaxMutations());
assertTrue(some.getListener() == clone.getListener());
assertTrue(some.getPool() == clone.getPool());
assertEquals(some.getImplementationClassName(),
clone.getImplementationClassName());
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index acfa25fecac..08e68def2c4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -175,6 +175,23 @@ public class TestAsyncBufferMutator {
assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
+ @Test
+ public void testMaxMutationsFlush() throws InterruptedException,
ExecutionException {
+ AsyncBufferedMutator mutator =
+ CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
+ CompletableFuture<?> future1 =
+ mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ CompletableFuture<?> future2 =
+ mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
+ CompletableFuture<?> future3 =
+ mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
+ CompletableFuture.allOf(future1, future2, future3).join();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
+ assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
+ }
+
// a bit deep into the implementation
@Test
public void testCancelPeriodicFlush() throws InterruptedException,
ExecutionException {
@@ -244,8 +261,9 @@ public class TestAsyncBufferMutator {
private int flushCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
- long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
- super(periodicalFlushTimer, table, writeBufferSize,
periodicFlushTimeoutNs, maxKeyValueSize);
+ long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize,
int maxMutation) {
+ super(periodicalFlushTimer, table, writeBufferSize,
periodicFlushTimeoutNs, maxKeyValueSize,
+ maxMutation);
}
@Override
@@ -261,7 +279,7 @@ public class TestAsyncBufferMutator {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER,
CONN.getTable(TABLE_NAME),
- 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024))
{
+ 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024,
100)) {
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task