This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 32fb393ef39 HBASE-29148: BufferedMutator should be able to flush after
buffering a certain number of mutations (#6718)
32fb393ef39 is described below
commit 32fb393ef39883f1c6416ddd74e3110e4698dfa8
Author: Charles Connell <[email protected]>
AuthorDate: Mon Mar 3 11:02:22 2025 -0500
HBASE-29148: BufferedMutator should be able to flush after buffering a
certain number of mutations (#6718)
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
---
.../hadoop/hbase/client/AsyncBufferedMutator.java | 5 ++
.../hbase/client/AsyncBufferedMutatorBuilder.java | 7 ++
.../client/AsyncBufferedMutatorBuilderImpl.java | 12 +++-
.../hbase/client/AsyncBufferedMutatorImpl.java | 19 +++++-
.../hbase/client/AsyncConnectionConfiguration.java | 7 ++
.../hadoop/hbase/client/BufferedMutator.java | 8 +++
.../hadoop/hbase/client/BufferedMutatorImpl.java | 12 +++-
.../hadoop/hbase/client/BufferedMutatorParams.java | 19 ++++++
.../hbase/client/ConnectionConfiguration.java | 12 ++++
.../hbase/client/ConnectionImplementation.java | 3 +
.../hbase/client/TestBufferedMutatorParams.java | 6 +-
.../hbase/client/TestAsyncBufferMutator.java | 24 ++++++-
.../hadoop/hbase/client/TestBufferedMutator2.java | 79 ++++++++++++++++++++++
13 files changed, 204 insertions(+), 9 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index e5f28d2e060..27e11219c03 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -87,6 +87,11 @@ public interface AsyncBufferedMutator extends Closeable {
*/
long getWriteBufferSize();
+ /**
+ * The maximum number of mutations that this buffered mutator will buffer
before flushing them
+ */
+ int getMaxMutations();
+
/**
* Returns the periodical flush interval, 0 means disabled.
*/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index ed21fb8e23e..11aac4bd1d3 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -103,6 +103,13 @@ public interface AsyncBufferedMutatorBuilder {
*/
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
+ /**
+ * Set the maximum number of mutations that this buffered mutator will
buffer before flushing
+ * them. If you are talking to a cluster that uses
hbase.rpc.rows.size.threshold.reject to reject
+ * large Multi requests, you may need this setting to avoid rejections.
Default is no limit.
+ */
+ AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations);
+
/**
* Create the {@link AsyncBufferedMutator} instance.
*/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index ede5b359e83..698947c8754 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -39,12 +39,15 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
private int maxKeyValueSize;
+ private int maxMutations;
+
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
this.periodicFlushTimeoutNs =
connConf.getWriteBufferPeriodicFlushTimeoutNs();
this.maxKeyValueSize = connConf.getMaxKeyValueSize();
+ this.maxMutations = connConf.getBufferedMutatorMaxMutations();
this.periodicalFlushTimer = periodicalFlushTimer;
}
@@ -100,9 +103,16 @@ class AsyncBufferedMutatorBuilderImpl implements
AsyncBufferedMutatorBuilder {
return this;
}
+ @Override
+ public AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations) {
+ Preconditions.checkArgument(maxMutations > 0, "maxMutations %d must be >
0", maxMutations);
+ this.maxMutations = maxMutations;
+ return this;
+ }
+
@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(periodicalFlushTimer,
tableBuilder.build(), writeBufferSize,
- periodicFlushTimeoutNs, maxKeyValueSize);
+ periodicFlushTimeoutNs, maxKeyValueSize, maxMutations);
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index ce4193d9138..9f74323785c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -31,6 +31,8 @@ import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@@ -41,6 +43,8 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);
+
private final HashedWheelTimer periodicalFlushTimer;
private final AsyncTable<?> table;
@@ -51,6 +55,8 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
private final int maxKeyValueSize;
+ private final int maxMutations;
+
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -62,12 +68,13 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
Timeout periodicFlushTask;
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
- long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
+ long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize,
int maxMutations) {
this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
this.maxKeyValueSize = maxKeyValueSize;
+ this.maxMutations = maxMutations;
}
@Override
@@ -144,6 +151,10 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
this.futures.addAll(futures);
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
+ LOG.trace("Flushing because write buffer size {} reached",
writeBufferSize);
+ internalFlush();
+ } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
+ LOG.trace("Flushing because max mutations {} reached", maxMutations);
internalFlush();
}
}
@@ -170,4 +181,10 @@ class AsyncBufferedMutatorImpl implements
AsyncBufferedMutator {
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}
+
+ @Override
+ public int getMaxMutations() {
+ return maxMutations;
+ }
+
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 5fb95ebbd87..14bc0598d84 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -101,6 +101,8 @@ class AsyncConnectionConfiguration {
private final int maxKeyValueSize;
+ private final int bufferedMutatorMaxMutations;
+
AsyncConnectionConfiguration(Configuration conf) {
ConnectionConfiguration connectionConf = new ConnectionConfiguration(conf);
@@ -111,6 +113,7 @@ class AsyncConnectionConfiguration {
this.writeBufferPeriodicFlushTimeoutNs =
connectionConf.getWriteBufferPeriodicFlushTimeoutMs();
this.maxKeyValueSize = connectionConf.getMaxKeyValueSize();
this.maxRetries = connectionConf.getRetriesNumber();
+ this.bufferedMutatorMaxMutations =
connectionConf.getBufferedMutatorMaxMutations();
// fields from connection configuration that need to be converted to nanos
this.metaOperationTimeoutNs =
@@ -229,4 +232,8 @@ class AsyncConnectionConfiguration {
int getMaxKeyValueSize() {
return maxKeyValueSize;
}
+
+ int getBufferedMutatorMaxMutations() {
+ return bufferedMutatorMaxMutations;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 7c95643ecc9..84a39ec3802 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -194,6 +194,14 @@ public interface BufferedMutator extends Closeable {
"The BufferedMutator::setOperationTimeout has not been implemented");
}
+ /**
+ * The maximum number of mutations that this buffered mutator will buffer
before flushing them
+ */
+ default int getMaxMutations() {
+ throw new UnsupportedOperationException(
+ "The BufferedMutator::getMaxMutations has not been implemented");
+ }
+
/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index e0795c7c4c6..98ce2dfa625 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -86,6 +86,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private Timer writeBufferPeriodicFlushTimer = null;
private final int maxKeyValueSize;
+ private final int maxMutations;
private final ExecutorService pool;
private final AtomicInteger rpcTimeout;
private final AtomicInteger operationTimeout;
@@ -128,6 +129,10 @@ public class BufferedMutatorImpl implements
BufferedMutator {
? params.getMaxKeyValueSize()
: tableConf.getMaxKeyValueSize();
+ this.maxMutations = params.getMaxMutations() != UNSET
+ ? params.getMaxMutations()
+ : conn.getConnectionConfiguration().getBufferedMutatorMaxMutations();
+
this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != UNSET
? params.getRpcTimeout()
: conn.getConnectionConfiguration().getWriteRpcTimeout());
@@ -280,8 +285,11 @@ public class BufferedMutatorImpl implements
BufferedMutator {
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
while (true) {
- if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
- // There is the room to accept more mutations.
+ if (
+ !flushAll && (currentWriteBufferSize.get() <= writeBufferSize)
+ && (maxMutations == UNSET || size() < maxMutations)
+ ) {
+ // There is room to accept more mutations.
break;
}
AsyncRequestFuture asf;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index ee614716f86..ecc7d6e0031 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -38,6 +38,7 @@ public class BufferedMutatorParams implements Cloneable {
private String implementationClassName = null;
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
+ private int maxMutations = UNSET;
private BufferedMutator.ExceptionListener listener = new
BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
@@ -85,6 +86,23 @@ public class BufferedMutatorParams implements Cloneable {
return operationTimeout;
}
+ /**
+ * Set the maximum number of mutations that this buffered mutator will
buffer before flushing
+ * them. If you are talking to a cluster that uses
hbase.rpc.rows.size.threshold.reject to reject
+ * large Multi requests, you may need this setting to avoid rejections.
Default is no limit.
+ */
+ public BufferedMutatorParams setMaxMutations(int maxMutations) {
+ this.maxMutations = maxMutations;
+ return this;
+ }
+
+ /**
+ * The maximum number of mutations that this buffered mutator will buffer
before flushing them
+ */
+ public int getMaxMutations() {
+ return maxMutations;
+ }
+
/**
* Override the write buffer size specified by the provided {@link
Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the
configuration key
@@ -188,6 +206,7 @@ public class BufferedMutatorParams implements Cloneable {
clone.writeBufferPeriodicFlushTimeoutMs =
this.writeBufferPeriodicFlushTimeoutMs;
clone.writeBufferPeriodicFlushTimerTickMs =
this.writeBufferPeriodicFlushTimerTickMs;
clone.maxKeyValueSize = this.maxKeyValueSize;
+ clone.maxMutations = this.maxMutations;
clone.pool = this.pool;
clone.listener = this.listener;
clone.implementationClassName = this.implementationClassName;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 93fa2600d89..be118857fc6 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -47,6 +47,9 @@ public class ConnectionConfiguration {
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT =
1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY =
"hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
+ public static final String BUFFERED_MUTATOR_MAX_MUTATIONS_KEY =
+ "hbase.client.write.buffer.maxmutations";
+ public static final int BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT = -1;
public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
"hbase.client.primaryCallTimeout.get";
public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; //
10ms
@@ -88,6 +91,7 @@ public class ConnectionConfiguration {
private final int metaReplicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
+ private final int bufferedMutatorMaxMutations;
private final int rpcTimeout;
private final int readRpcTimeout;
private final int metaReadRpcTimeout;
@@ -143,6 +147,9 @@ public class ConnectionConfiguration {
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY,
MAX_KEYVALUE_SIZE_DEFAULT);
+ this.bufferedMutatorMaxMutations =
+ conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY,
BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
+
this.rpcTimeout =
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@@ -193,6 +200,7 @@ public class ConnectionConfiguration {
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.clientScannerAsyncPrefetch =
Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+ this.bufferedMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
@@ -259,6 +267,10 @@ public class ConnectionConfiguration {
return maxKeyValueSize;
}
+ public int getBufferedMutatorMaxMutations() {
+ return bufferedMutatorMaxMutations;
+ }
+
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index eef82643b5b..a85753fcc79 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -485,6 +485,9 @@ public class ConnectionImplementation implements
ClusterConnection, Closeable {
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
+ if (params.getMaxMutations() == BufferedMutatorParams.UNSET) {
+
params.setMaxMutations(connectionConfig.getBufferedMutatorMaxMutations());
+ }
// Look to see if an alternate BufferedMutation implementation is wanted.
// Look in params and in config. If null, use default.
String implementationClassName = params.getImplementationClassName();
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
index ba23d105393..fdc7c305500 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -140,8 +140,8 @@ public class TestBufferedMutatorParams {
BufferedMutator.ExceptionListener listener = new MockExceptionListener();
bmp.writeBufferSize(17).setWriteBufferPeriodicFlushTimeoutMs(123)
-
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).pool(pool)
- .listener(listener);
+
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).setMaxMutations(3737)
+ .pool(pool).listener(listener);
bmp.implementationClassName("someClassName");
BufferedMutatorParams clone = bmp.clone();
@@ -151,6 +151,7 @@ public class TestBufferedMutatorParams {
assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(13, clone.getMaxKeyValueSize());
+ assertEquals(3737, clone.getMaxMutations());
assertEquals("someClassName", clone.getImplementationClassName());
cloneTest(bmp, clone);
@@ -178,6 +179,7 @@ public class TestBufferedMutatorParams {
assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
+ assertTrue(some.getMaxMutations() == clone.getMaxMutations());
assertTrue(some.getListener() == clone.getListener());
assertTrue(some.getPool() == clone.getPool());
assertEquals(some.getImplementationClassName(),
clone.getImplementationClassName());
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index b479d4de573..2802c77b5dd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -175,6 +175,23 @@ public class TestAsyncBufferMutator {
assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
+ @Test
+ public void testMaxMutationsFlush() throws InterruptedException,
ExecutionException {
+ AsyncBufferedMutator mutator =
+ CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
+ CompletableFuture<?> future1 =
+ mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ CompletableFuture<?> future2 =
+ mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
+ CompletableFuture<?> future3 =
+ mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
+ CompletableFuture.allOf(future1, future2, future3).join();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
+ assertArrayEquals(VALUE, table.get(new
Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
+ }
+
// a bit deep into the implementation
@Test
public void testCancelPeriodicFlush() throws InterruptedException,
ExecutionException {
@@ -244,8 +261,9 @@ public class TestAsyncBufferMutator {
private int flushCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer,
AsyncTable<?> table,
- long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
- super(periodicalFlushTimer, table, writeBufferSize,
periodicFlushTimeoutNs, maxKeyValueSize);
+ long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize,
int maxMutation) {
+ super(periodicalFlushTimer, table, writeBufferSize,
periodicFlushTimeoutNs, maxKeyValueSize,
+ maxMutation);
}
@Override
@@ -261,7 +279,7 @@ public class TestAsyncBufferMutator {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER,
CONN.getTable(TABLE_NAME),
- 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024))
{
+ 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024,
100)) {
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator2.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator2.java
new file mode 100644
index 00000000000..2dd2057b8c0
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator2.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBufferedMutator2 {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBufferedMutator2.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("example-table");
+
+ private static byte[] CF = Bytes.toBytes("cf");
+ private static byte[] CQ = Bytes.toBytes("cq");
+ private static byte[] VALUE = new byte[1024];
+
+ private static Connection CONN;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.createTable(TABLE_NAME, CF);
+ CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ Bytes.random(VALUE);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMaxMutationsFlush() throws IOException {
+ BufferedMutator mutator =
+ CONN.getBufferedMutator(new
BufferedMutatorParams(TABLE_NAME).setMaxMutations(3));
+ mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
+ mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
+ Table table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).getValue(CF,
CQ));
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).getValue(CF,
CQ));
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).getValue(CF,
CQ));
+ }
+
+}