qianye1001 opened a new issue, #1225: URL: https://github.com/apache/rocketmq-clients/issues/1225
### Programming Language of the Client Java ### Is Your Feature Request Related to a Problem? Currently, the Java SimpleConsumer only supports single `receive()` calls, requiring users to manually manage concurrency and aggregation when they need to process messages in batches. Similarly, PushConsumer lacks a built-in mechanism to accumulate messages into batches before invoking the message listener — users who want batch semantics must implement their own buffering layer on top of the existing per-message callback. This creates two problems: 1. **SimpleConsumer**: No way to efficiently pull a batch of messages with a single API call. Users must write their own retry loops, concurrency management, and aggregation logic. 2. **PushConsumer**: No built-in batch consumption — every message triggers a separate listener invocation, which is inefficient for workloads that benefit from batch processing (e.g., batch database writes, bulk API calls). ### Describe the Solution You'd Like Add first-class batch support to both consumer models, controlled by a shared `BatchPolicy`: **SimpleConsumer — Adaptive Batch Receive** - New API: `batchReceive(Duration)`, `batchReceive(String topic, Duration)`, and their async variants. - Implementation (`BatchingSimpleConsumerImpl`): a decorator that converts each `batchReceive()` call into adaptive concurrent `receiveAsync()` calls. The algorithm splits the remaining message count evenly across `min(ceil(remaining/32), maxInflight)` concurrent receives per round, waits for ALL of them to complete (never cancelled), and starts the maxWaitTime deadline only after the first message arrives. - New `batchAck()` / `batchAckAsync()` for efficient batch acknowledgment grouped by (topic, endpoints). **PushConsumer — Batch Consume Service** - New `BatchMessageListener` interface and `PushConsumerBuilder.setBatchPolicy()/setBatchMessageListener()` API. - Implementation (`BatchConsumeService`): accumulates messages from all ProcessQueues into a single `BatchBuffer`, flushing to the `BatchMessageListener` when any BatchPolicy threshold is reached (count, bytes, or time). **Shared BatchPolicy** - Controls: `maxBatchCount`, `maxBatchBytes`, `maxWaitTime`, `maxInflight` (SimpleConsumer only). - Builder pattern with sensible defaults (32 messages, 4MB, 5 seconds, 4 inflight). ### Describe Alternatives You've Considered 1. **Client-side buffering with background fetch thread**: An earlier design used a background worker thread with a request queue and overflow buffer. This was more complex, harder to reason about, and introduced cache management/eviction problems. The current adaptive approach is simpler — no background thread, no cache, no queue. 2. **Server-side batch receive**: Relying on the server to support a native batch-receive RPC. This would require server-side protocol changes and is a longer-term effort. The client-side approach works with the existing server protocol. ### Additional Context Key design decisions: - **Deadline from first message**: The `maxWaitTime` clock starts only after the first message is received, not from the call start. This avoids returning empty batches prematurely. - **Inflight requests always complete**: Once receive calls are fired, they are never cancelled. The system only decides whether to start a new round after all inflight requests return. - **No excess messages**: The total count requested across concurrent receives in a round equals exactly the remaining count needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
