qianye1001 opened a new issue, #10489:
URL: https://github.com/apache/rocketmq/issues/10489

   ### Before Creating the Enhancement Request
   
   - [x] I have confirmed that this should be classified as an enhancement 
rather than a bug/feature.
   
   ### Summary
   
   Add `BatchChangeInvisibleTime` support in the Proxy and Broker, following 
the existing `BatchAck` pattern. Currently, when the Proxy needs to change 
invisible time for multiple messages (client disconnect, write-back failure, 
periodic renewal, message filtering), it sends individual 
`CHANGE_MESSAGE_INVISIBLETIME` requests to the Broker one by one. This causes 
excessive network round-trips and, on the PopConsumerKVService (RocksDB) path, 
excessive RocksDB flushes (2N `db.write()` calls for N messages).
   
   ### Motivation
   
   Multiple scenarios in the Proxy require batch `changeInvisibleTime`:
   
   1. **Client disconnect** (`DefaultReceiptHandleManager.returnHandleGroup`): 
When a client goes offline, the Proxy scans all unacked receipt handles for 
that client and calls `changeInvisibleTime` for each one individually. A 
consumer with many in-flight messages triggers N separate network round-trips.
   
   2. **ReceiveMessage write-back failure** 
(`ReceiveMessageResponseStreamWriter`): When writing messages back to the gRPC 
client fails mid-stream, the Proxy nacks all remaining messages one by one.
   
   3. **Periodic renewal** (`DefaultReceiptHandleManager.scheduleRenewTask`): 
The Proxy periodically renews receipt handles that are about to expire. Each 
handle is renewed individually via a separate `changeInvisibleTime` call.
   
   4. **Pop message filtering** (`ConsumerProcessor.filterPopResult`): Messages 
filtered as `TO_RETURN` are changed invisible time individually.
   
   On the Broker side with `PopConsumerKVServiceEnable=true`, each individual 
`changeInvisibleTime` call in `PopConsumerService.changeInvisibilityDuration()` 
does:
   - `popConsumerStore.writeRecords(Collections.singletonList(ckRecord))` — 1 
RocksDB WriteBatch with 1 entry
   - `popConsumerStore.deleteRecords(Collections.singletonList(ackRecord))` — 1 
RocksDB WriteBatch with 1 entry
   
   For N messages, this results in 2N `db.write()` calls. Since 
`PopConsumerRocksdbStore` uses `sync=true` on WriteOptions, each write forces a 
WAL flush, making this a significant bottleneck.
   
   ### Describe the Solution You'd Like
   
   Add a new `BATCH_CHANGE_MESSAGE_INVISIBLETIME` request code, following the 
existing `BatchAck` (`BATCH_ACK_MESSAGE`) pattern:
   
   **Request format** — Each entry carries the same parameters as the single 
`ChangeInvisibleTimeRequestHeader`:
   - `consumerGroup`, `topic`, `queueId`, `extraInfo`, `offset`, 
`invisibleTime`, `liteTopic`, `suspend`
   
   **Response format** — Per-entry results with new receipt handle info (same 
as `ChangeInvisibleTimeResponseHeader`):
   - `code` (SUCCESS or error), `popTime`, `invisibleTime`, `reviveQid`
   
   **Layer-by-layer changes:**
   
   1. **Remoting**: Add `RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME`, 
`BatchChangeInvisibleTimeRequestBody` (with 
`List<ChangeInvisibleTimeRequestEntry>`), 
`BatchChangeInvisibleTimeResponseBody` (with per-entry results)
   
   2. **Broker** — True batch processing:
      - `PopConsumerService.batchChangeInvisibilityDuration()`: Collect all CK 
records and ack records, then call `writeRecords(allCkRecords)` once and 
`deleteRecords(allAckRecords)` once. This reduces N messages from 2N 
`db.write()` to just 2 `db.write()` calls.
      - `ChangeInvisibleTimeProcessor`: Handle 
`BATCH_CHANGE_MESSAGE_INVISIBLETIME` — on KV path, delegate to the batch method 
above; on traditional revive path, iterate and call 
`appendCheckPointThenAckOrigin()` per entry.
      - `BrokerController`: Register the new request code to 
`changeInvisibleTimeProcessor`.
   
   3. **Client**: Add `batchChangeInvisibleTimeAsync()` to `MQClientAPIImpl` 
and `MQClientAPIExt`.
   
   4. **Proxy Service**: Add `batchChangeInvisibleTime()` to `MessageService`, 
`ClusterMessageService`, and `LocalMessageService`.
   
   5. **Proxy Processor**: Add `batchChangeInvisibleTime()` to 
`ConsumerProcessor` (group by broker, filter expired handles), 
`MessagingProcessor` interface, and `DefaultMessagingProcessor`.
   
   6. **Caller integration**:
      - `DefaultReceiptHandleManager.returnHandleGroup()`: Collect all handles, 
batch send
      - `DefaultReceiptHandleManager.scheduleRenewTask()`: Collect handles 
needing renewal per group key, batch send, update each handle with per-entry 
result
      - `ReceiveMessageResponseStreamWriter`: Collect all nack messages, batch 
send
      - `ConsumerProcessor.filterPopResult()`: Collect all TO_RETURN messages, 
batch send
   
   7. **Config**: Add `enableBatchChangeInvisibleTime` flag (default false) for 
gradual rollout.
   
   ### Describe Alternatives You've Considered
   
   1. **Proxy-level batching only (no Broker changes)**: Group entries by 
broker at the Proxy, but still send individual `CHANGE_MESSAGE_INVISIBLETIME` 
requests. This reduces Proxy-side processing overhead but not the number of 
network round-trips or RocksDB flushes. Rejected because the Broker-side 
RocksDB flush reduction is the main performance win.
   
   2. **Reuse `BATCH_ACK_MESSAGE` with BitSet merging**: `BatchAck` merges 
multiple offsets sharing the same CK into a BitSet. This doesn't work for 
`ChangeInvisibleTime` because each entry requires its own new checkpoint write 
+ old checkpoint ack, and each entry may have a different `invisibleTime`. A 
list-based request body is necessary.
   
   3. **Async fire-and-forget (no per-entry response)**: For the 
`returnHandleGroup` scenario, per-entry results aren't strictly needed. 
However, the renewal scenario (`startRenewMessage`) requires per-entry new 
receipt handles to update the cached handle via 
`messageReceiptHandle.updateReceiptHandle()`. Supporting per-entry responses 
makes the batch API general-purpose across all scenarios.
   
   ### Additional Context
   
   **RocksDB batch optimization detail:**
   
   Current (N messages = 2N `db.write()`):
   ```java
   // PopConsumerService.changeInvisibilityDuration() — called N times
   this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));   
// 1 WriteBatch, 1 entry
   this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord)); 
// 1 WriteBatch, 1 entry
   ```
   
   Proposed (N messages = 2 `db.write()`):
   ```java
   // PopConsumerService.batchChangeInvisibilityDuration() — called once
   this.popConsumerStore.writeRecords(allCkRecords);     // 1 WriteBatch, N 
entries
   this.popConsumerStore.deleteRecords(allAckRecords);    // 1 WriteBatch, N 
entries
   ```
   
   `PopConsumerRocksdbStore.writeRecords/deleteRecords` already accept 
`List<PopConsumerRecord>` and use `WriteBatch` internally, so the 
infrastructure for batch RocksDB writes is already in place — we just need to 
stop passing `Collections.singletonList()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to