qianye1001 opened a new issue, #1225:
URL: https://github.com/apache/rocketmq-clients/issues/1225

   ### Programming Language of the Client
   
   Java
   
   ### Is Your Feature Request Related to a Problem?
   
   Currently, the Java SimpleConsumer only supports single `receive()` calls, 
requiring users to manually manage concurrency and aggregation when they need 
to process messages in batches. Similarly, PushConsumer lacks a built-in 
mechanism to accumulate messages into batches before invoking the message 
listener — users who want batch semantics must implement their own buffering 
layer on top of the existing per-message callback.
   
   This creates two problems:
   1. **SimpleConsumer**: No way to efficiently pull a batch of messages with a 
single API call. Users must write their own retry loops, concurrency 
management, and aggregation logic.
   2. **PushConsumer**: No built-in batch consumption — every message triggers 
a separate listener invocation, which is inefficient for workloads that benefit 
from batch processing (e.g., batch database writes, bulk API calls).
   
   ### Describe the Solution You'd Like
   
   Add first-class batch support to both consumer models, controlled by a 
shared `BatchPolicy`:
   
   **SimpleConsumer — Adaptive Batch Receive**
   - New API: `batchReceive(Duration)`, `batchReceive(String topic, Duration)`, 
and their async variants.
   - Implementation (`BatchingSimpleConsumerImpl`): a decorator that converts 
each `batchReceive()` call into adaptive concurrent `receiveAsync()` calls. The 
algorithm splits the remaining message count evenly across 
`min(ceil(remaining/32), maxInflight)` concurrent receives per round, waits for 
ALL of them to complete (never cancelled), and starts the maxWaitTime deadline 
only after the first message arrives.
   - New `batchAck()` / `batchAckAsync()` for efficient batch acknowledgment 
grouped by (topic, endpoints).
   
   **PushConsumer — Batch Consume Service**
   - New `BatchMessageListener` interface and 
`PushConsumerBuilder.setBatchPolicy()/setBatchMessageListener()` API.
   - Implementation (`BatchConsumeService`): accumulates messages from all 
ProcessQueues into a single `BatchBuffer`, flushing to the 
`BatchMessageListener` when any BatchPolicy threshold is reached (count, bytes, 
or time).
   
   **Shared BatchPolicy**
   - Controls: `maxBatchCount`, `maxBatchBytes`, `maxWaitTime`, `maxInflight` 
(SimpleConsumer only).
   - Builder pattern with sensible defaults (32 messages, 4MB, 5 seconds, 4 
inflight).
   
   ### Describe Alternatives You've Considered
   
   1. **Client-side buffering with background fetch thread**: An earlier design 
used a background worker thread with a request queue and overflow buffer. This 
was more complex, harder to reason about, and introduced cache 
management/eviction problems. The current adaptive approach is simpler — no 
background thread, no cache, no queue.
   
   2. **Server-side batch receive**: Relying on the server to support a native 
batch-receive RPC. This would require server-side protocol changes and is a 
longer-term effort. The client-side approach works with the existing server 
protocol.
   
   ### Additional Context
   
   Key design decisions:
   - **Deadline from first message**: The `maxWaitTime` clock starts only after 
the first message is received, not from the call start. This avoids returning 
empty batches prematurely.
   - **Inflight requests always complete**: Once receive calls are fired, they 
are never cancelled. The system only decides whether to start a new round after 
all inflight requests return.
   - **No excess messages**: The total count requested across concurrent 
receives in a round equals exactly the remaining count needed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to