qianye1001 opened a new issue, #10489:
URL: https://github.com/apache/rocketmq/issues/10489
### Before Creating the Enhancement Request
- [x] I have confirmed that this should be classified as an enhancement
rather than a bug/feature.
### Summary
Add `BatchChangeInvisibleTime` support in the Proxy and Broker, following
the existing `BatchAck` pattern. Currently, when the Proxy needs to change
invisible time for multiple messages (client disconnect, write-back failure,
periodic renewal, message filtering), it sends individual
`CHANGE_MESSAGE_INVISIBLETIME` requests to the Broker one by one. This causes
excessive network round-trips and, on the PopConsumerKVService (RocksDB) path,
excessive RocksDB flushes (2N `db.write()` calls for N messages).
### Motivation
Multiple scenarios in the Proxy require batch `changeInvisibleTime`:
1. **Client disconnect** (`DefaultReceiptHandleManager.returnHandleGroup`):
When a client goes offline, the Proxy scans all unacked receipt handles for
that client and calls `changeInvisibleTime` for each one individually. A
consumer with many in-flight messages triggers N separate network round-trips.
2. **ReceiveMessage write-back failure**
(`ReceiveMessageResponseStreamWriter`): When writing messages back to the gRPC
client fails mid-stream, the Proxy nacks all remaining messages one by one.
3. **Periodic renewal** (`DefaultReceiptHandleManager.scheduleRenewTask`):
The Proxy periodically renews receipt handles that are about to expire. Each
handle is renewed individually via a separate `changeInvisibleTime` call.
4. **Pop message filtering** (`ConsumerProcessor.filterPopResult`): Messages
filtered as `TO_RETURN` are changed invisible time individually.
On the Broker side with `PopConsumerKVServiceEnable=true`, each individual
`changeInvisibleTime` call in `PopConsumerService.changeInvisibilityDuration()`
does:
- `popConsumerStore.writeRecords(Collections.singletonList(ckRecord))` — 1
RocksDB WriteBatch with 1 entry
- `popConsumerStore.deleteRecords(Collections.singletonList(ackRecord))` — 1
RocksDB WriteBatch with 1 entry
For N messages, this results in 2N `db.write()` calls. Since
`PopConsumerRocksdbStore` uses `sync=true` on WriteOptions, each write forces a
WAL flush, making this a significant bottleneck.
### Describe the Solution You'd Like
Add a new `BATCH_CHANGE_MESSAGE_INVISIBLETIME` request code, following the
existing `BatchAck` (`BATCH_ACK_MESSAGE`) pattern:
**Request format** — Each entry carries the same parameters as the single
`ChangeInvisibleTimeRequestHeader`:
- `consumerGroup`, `topic`, `queueId`, `extraInfo`, `offset`,
`invisibleTime`, `liteTopic`, `suspend`
**Response format** — Per-entry results with new receipt handle info (same
as `ChangeInvisibleTimeResponseHeader`):
- `code` (SUCCESS or error), `popTime`, `invisibleTime`, `reviveQid`
**Layer-by-layer changes:**
1. **Remoting**: Add `RequestCode.BATCH_CHANGE_MESSAGE_INVISIBLETIME`,
`BatchChangeInvisibleTimeRequestBody` (with
`List<ChangeInvisibleTimeRequestEntry>`),
`BatchChangeInvisibleTimeResponseBody` (with per-entry results)
2. **Broker** — True batch processing:
- `PopConsumerService.batchChangeInvisibilityDuration()`: Collect all CK
records and ack records, then call `writeRecords(allCkRecords)` once and
`deleteRecords(allAckRecords)` once. This reduces N messages from 2N
`db.write()` to just 2 `db.write()` calls.
- `ChangeInvisibleTimeProcessor`: Handle
`BATCH_CHANGE_MESSAGE_INVISIBLETIME` — on KV path, delegate to the batch method
above; on traditional revive path, iterate and call
`appendCheckPointThenAckOrigin()` per entry.
- `BrokerController`: Register the new request code to
`changeInvisibleTimeProcessor`.
3. **Client**: Add `batchChangeInvisibleTimeAsync()` to `MQClientAPIImpl`
and `MQClientAPIExt`.
4. **Proxy Service**: Add `batchChangeInvisibleTime()` to `MessageService`,
`ClusterMessageService`, and `LocalMessageService`.
5. **Proxy Processor**: Add `batchChangeInvisibleTime()` to
`ConsumerProcessor` (group by broker, filter expired handles),
`MessagingProcessor` interface, and `DefaultMessagingProcessor`.
6. **Caller integration**:
- `DefaultReceiptHandleManager.returnHandleGroup()`: Collect all handles,
batch send
- `DefaultReceiptHandleManager.scheduleRenewTask()`: Collect handles
needing renewal per group key, batch send, update each handle with per-entry
result
- `ReceiveMessageResponseStreamWriter`: Collect all nack messages, batch
send
- `ConsumerProcessor.filterPopResult()`: Collect all TO_RETURN messages,
batch send
7. **Config**: Add `enableBatchChangeInvisibleTime` flag (default false) for
gradual rollout.
### Describe Alternatives You've Considered
1. **Proxy-level batching only (no Broker changes)**: Group entries by
broker at the Proxy, but still send individual `CHANGE_MESSAGE_INVISIBLETIME`
requests. This reduces Proxy-side processing overhead but not the number of
network round-trips or RocksDB flushes. Rejected because the Broker-side
RocksDB flush reduction is the main performance win.
2. **Reuse `BATCH_ACK_MESSAGE` with BitSet merging**: `BatchAck` merges
multiple offsets sharing the same CK into a BitSet. This doesn't work for
`ChangeInvisibleTime` because each entry requires its own new checkpoint write
+ old checkpoint ack, and each entry may have a different `invisibleTime`. A
list-based request body is necessary.
3. **Async fire-and-forget (no per-entry response)**: For the
`returnHandleGroup` scenario, per-entry results aren't strictly needed.
However, the renewal scenario (`startRenewMessage`) requires per-entry new
receipt handles to update the cached handle via
`messageReceiptHandle.updateReceiptHandle()`. Supporting per-entry responses
makes the batch API general-purpose across all scenarios.
### Additional Context
**RocksDB batch optimization detail:**
Current (N messages = 2N `db.write()`):
```java
// PopConsumerService.changeInvisibilityDuration() — called N times
this.popConsumerStore.writeRecords(Collections.singletonList(ckRecord));
// 1 WriteBatch, 1 entry
this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
// 1 WriteBatch, 1 entry
```
Proposed (N messages = 2 `db.write()`):
```java
// PopConsumerService.batchChangeInvisibilityDuration() — called once
this.popConsumerStore.writeRecords(allCkRecords); // 1 WriteBatch, N
entries
this.popConsumerStore.deleteRecords(allAckRecords); // 1 WriteBatch, N
entries
```
`PopConsumerRocksdbStore.writeRecords/deleteRecords` already accept
`List<PopConsumerRecord>` and use `WriteBatch` internally, so the
infrastructure for batch RocksDB writes is already in place — we just need to
stop passing `Collections.singletonList()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]