lollipopjin commented on code in PR #10519:
URL: https://github.com/apache/rocketmq/pull/10519#discussion_r3452352655
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java:
##########
@@ -186,6 +206,248 @@ public CompletableFuture<RemotingCommand>
processRequestAsync(final Channel chan
});
}
+ protected CompletableFuture<RemotingCommand>
processBatchRequestAsync(final Channel channel,
+ RemotingCommand request, boolean brokerAllowSuspend) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+
+ BatchChangeInvisibleTimeRequestHeader batchRequestHeader;
+ try {
+ batchRequestHeader = (BatchChangeInvisibleTimeRequestHeader)
+
request.decodeCommandCustomHeader(BatchChangeInvisibleTimeRequestHeader.class);
+ } catch (Throwable t) {
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark("batch change invisible time request header is
invalid");
+ return CompletableFuture.completedFuture(response);
+ }
+
+ BatchChangeInvisibleTimeRequestBody requestBody =
+ BatchChangeInvisibleTimeRequestBody.decode(request.getBody(),
BatchChangeInvisibleTimeRequestBody.class);
+ List<ChangeInvisibleTimeRequestEntry> requestEntries = requestBody ==
null || requestBody.getEntries() == null ?
+ Collections.emptyList() : requestBody.getEntries();
+ int batchMaxNum = Math.max(1,
brokerController.getBrokerConfig().getBatchChangeInvisibleTimeMaxNum());
+ if (requestEntries.size() > batchMaxNum) {
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(String.format("batch change invisible time
entries exceed limit: %d",
+ batchMaxNum));
+ return CompletableFuture.completedFuture(response);
+ }
+
+ ChangeInvisibleTimeResponseEntry[] responseEntries = new
ChangeInvisibleTimeResponseEntry[requestEntries.size()];
+ for (int i = 0; i < requestEntries.size(); i++) {
+ responseEntries[i] =
buildFailedResponseEntry(ResponseCode.SYSTEM_ERROR);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ if (!validateBatchRequestEntries(batchRequestHeader, requestEntries)) {
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark("batch change invisible time entries must use
the same topic and consumerGroup as request header");
+ return CompletableFuture.completedFuture(response);
+ }
+
+ if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable())
{
+ List<ChangeInvisibleTimeRequestEntry> kvChangeRecords = new
ArrayList<>();
+ List<Integer> kvIndexes = new ArrayList<>();
+ List<ChangeInvisibleTimeResponseEntry> kvSuccessEntries = new
ArrayList<>();
+ for (int i = 0; i < requestEntries.size(); i++) {
+ ChangeInvisibleTimeRequestEntry requestEntry =
requestEntries.get(i);
+ if (tryAppendBatchKvChange(channel, requestEntry, i,
responseEntries, kvChangeRecords,
+ kvIndexes, kvSuccessEntries)) {
+ continue;
+ }
+ appendSingleBatchEntry(channel, requestEntry,
request.getOpaque(), brokerAllowSuspend,
+ responseEntries, futures, i);
+ }
+
+ if (!kvChangeRecords.isEmpty()) {
+ try {
+
brokerController.getPopConsumerService().batchChangeInvisibilityDuration(kvChangeRecords);
+ for (int i = 0; i < kvIndexes.size(); i++) {
+ responseEntries[kvIndexes.get(i)] =
kvSuccessEntries.get(i);
+ }
+ } catch (Throwable t) {
+ POP_LOGGER.error("batch change invisibility duration
failed", t);
+ for (Integer index : kvIndexes) {
+ responseEntries[index] =
buildFailedResponseEntry(ResponseCode.SYSTEM_ERROR);
+ }
+ }
+ }
+ } else {
+ for (int i = 0; i < requestEntries.size(); i++) {
+ appendSingleBatchEntry(channel, requestEntries.get(i),
request.getOpaque(), brokerAllowSuspend,
+ responseEntries, futures, i);
+ }
+ }
+
+ return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).thenApply(ignored -> {
+ BatchChangeInvisibleTimeResponseBody responseBody = new
BatchChangeInvisibleTimeResponseBody();
+ responseBody.setEntries(Arrays.asList(responseEntries));
+ response.setBody(responseBody.encode());
+ return response;
+ });
+ }
+
+ protected boolean
validateBatchRequestEntries(BatchChangeInvisibleTimeRequestHeader
batchRequestHeader,
Review Comment:
How about renaming it to normalizeAndValidateBatchRequestEntries to reflect
that it mutates the input requestEntries (back-filling consumerGroup / topic
from the header)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]