This is an automated email from the ASF dual-hosted git repository.
cconnell pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 564d188effd HBASE-29385: Improve performance of
AggregrateImplementation quota checks (#7083)
564d188effd is described below
commit 564d188effd28b53e8f7b6459b7ac91581950e95
Author: Charles Connell <[email protected]>
AuthorDate: Wed Jun 18 08:17:56 2025 -0400
HBASE-29385: Improve performance of AggregrateImplementation quota checks
(#7083)
Signed-off-by: Ray Mattingly <[email protected]>
---
.../hbase/coprocessor/AggregateImplementation.java | 92 ++++++++++++++--------
.../coprocessor/TestAggregateImplementation.java | 23 +++++-
2 files changed, 83 insertions(+), 32 deletions(-)
diff --git
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 930596a4b78..b991ffe2ebb 100644
---
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -24,14 +24,15 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
@@ -84,13 +85,14 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
T max = null;
boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
byte[] colFamily = scan.getFamilies()[0];
@@ -115,6 +117,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
response = singlePartResponse(request, hasMoreRows,
partialResultContext, max,
ci::getProtoForCellType);
+ if (log.isDebugEnabled()) {
+ log.debug("Maximum from this region is {}: {} (partial result: {})
(client {})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), max,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -123,9 +130,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Maximum from this region is {}: {} (partial result: {}) (client
{})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), max,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -140,13 +144,14 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
T min = null;
boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
byte[] colFamily = scan.getFamilies()[0];
@@ -170,6 +175,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
response = singlePartResponse(request, hasMoreRows,
partialResultContext, min,
ci::getProtoForCellType);
+ if (log.isDebugEnabled()) {
+ log.debug("Minimum from this region is {}: {} (partial result: {})
(client {})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), min,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -178,9 +188,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Minimum from this region is {}: {} (partial result: {}) (client
{})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), min,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -195,7 +202,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
long sum = 0L;
boolean hasMoreRows = true;
try {
@@ -203,6 +210,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
S sumVal = null;
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -228,6 +236,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
response = singlePartResponse(request, hasMoreRows,
partialResultContext, sumVal,
ci::getProtoForPromotedType);
+ if (log.isDebugEnabled()) {
+ log.debug("Sum from this region is {}: {} (partial result: {}) (client
{})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), sum,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -236,9 +249,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Sum from this region is {}: {} (partial result: {}) (client
{})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), sum,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -253,10 +263,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
long counter = 0L;
List<Cell> results = new ArrayList<>();
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
boolean hasMoreRows = true;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
byte[][] colFamilies = scan.getFamilies();
byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
NavigableSet<byte[]> qualifiers =
@@ -284,6 +295,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
bb.rewind();
response = responseBuilder(request, hasMoreRows, partialResultContext)
.addFirstPart(ByteString.copyFrom(bb)).build();
+ if (log.isDebugEnabled()) {
+ log.debug("Row counter from this region is {}: {} (partial result: {})
(client {})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), counter,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -292,9 +308,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Row counter from this region is {}: {} (partial result: {})
(client {})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), counter,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -313,12 +326,13 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null;
Long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -388,12 +402,13 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -469,11 +484,12 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -539,18 +555,34 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
private final static class PartialResultContext {
+ private long rowsRead = 0;
+ private final int quotaCheckInterval;
private OperationQuota quota = null;
private long waitIntervalMs = 0;
- private byte[] lastRowSuccessfullyProcessedArray = null;
- private int lastRowSuccessfullyProcessedOffset = 0;
- private int lastRowSuccessfullyProcessedLength = 0;
+ private Cell lastRowSuccessfullyProcessed = null;
private long previousReadConsumed = 0;
private long previousReadConsumedDifference = 0;
+
+ private PartialResultContext(int quotaCheckInterval) {
+ this.quotaCheckInterval = quotaCheckInterval;
+ }
+ }
+
+ private PartialResultContext newPartialResultContext(Scan scan) {
+ if (scan.getCaching() > 0) {
+ // If the scan has caching set, we will use that as the quota check
interval.
+ return new PartialResultContext(scan.getCaching());
+ } else {
+ return new PartialResultContext(
+ env.getConfiguration().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
1000));
+ }
}
private boolean shouldBreakForThrottling(AggregateRequest request, Scan scan,
PartialResultContext context) throws IOException {
- if (request.getClientSupportsPartialResult()) {
+ if (
+ request.getClientSupportsPartialResult() && context.rowsRead %
context.quotaCheckInterval == 0
+ ) {
long maxBlockBytesScanned;
if (context.quota == null) {
maxBlockBytesScanned = Long.MAX_VALUE;
@@ -573,14 +605,13 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
private void postScanPartialResultUpdate(List<Cell> results,
PartialResultContext context) {
+ context.rowsRead += 1;
if (context.quota != null) {
context.quota.addScanResultCells(results);
}
if (!results.isEmpty()) {
Cell result = results.get(results.size() - 1);
- context.lastRowSuccessfullyProcessedArray = result.getRowArray();
- context.lastRowSuccessfullyProcessedOffset = result.getRowOffset();
- context.lastRowSuccessfullyProcessedLength = result.getRowLength();
+ context.lastRowSuccessfullyProcessed = result;
}
}
@@ -607,10 +638,9 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
PartialResultContext context) {
AggregateResponse.Builder builder = AggregateResponse.newBuilder();
if (request.getClientSupportsPartialResult() && hasMoreRows) {
- if (context.lastRowSuccessfullyProcessedArray != null) {
- byte[] lastRowSuccessfullyProcessed = Arrays.copyOfRange(
- context.lastRowSuccessfullyProcessedArray,
context.lastRowSuccessfullyProcessedOffset,
- context.lastRowSuccessfullyProcessedOffset +
context.lastRowSuccessfullyProcessedLength);
+ if (context.lastRowSuccessfullyProcessed != null) {
+ byte[] lastRowSuccessfullyProcessed =
+ CellUtil.cloneRow(context.lastRowSuccessfullyProcessed);
builder.setNextChunkStartRow(ByteString.copyFrom(
ClientUtil.calculateTheClosestNextRowKeyForPrefix(lastRowSuccessfullyProcessed)));
} else {
diff --git
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
index d2827aa1d33..bd65cd9c36a 100644
---
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
+++
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
@@ -99,7 +99,7 @@ public class TestAggregateImplementation {
when(region.getRegionInfo()).thenReturn(regionInfo);
when(regionInfo.getRegionNameAsString()).thenReturn("testRegion");
- scan = new Scan().addColumn(CF, CQ);
+ scan = new Scan().addColumn(CF, CQ).setCaching(1);
scanner = mock(RegionScannerImpl.class);
doAnswer(createMockScanner()).when(scanner).next(any(List.class));
@@ -530,6 +530,27 @@ public class TestAggregateImplementation {
response2.hasNextChunkStartRow());
}
+ @Test
+ public void testRowNumWithScannerCaching() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // set caching such that throttles are not triggered
+ scan = new Scan().addColumn(CF, CQ).setCaching(5);
+ request = AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+ aggregate.getRowNum(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ assertEquals(NUM_ROWS,
response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+ }
+
@Test
public void testRowNumThrottleWithNoResults() throws Exception {
AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))