qianye1001 opened a new pull request, #1226:
URL: https://github.com/apache/rocketmq-clients/pull/1226

   ### Which Issue(s) This PR Fixes
   
   Fixes #1225
   
   ### Brief Description
   
   This PR adds first-class batch support to both **SimpleConsumer** and 
**PushConsumer** in the Java client, controlled by a shared `BatchPolicy`.
   
   #### SimpleConsumer — Adaptive Batch Receive
   
   `BatchingSimpleConsumerImpl` is a `SimpleConsumer` decorator that converts 
`batchReceive()` into adaptive concurrent `receiveAsync()` calls:
   
   - **Fully asynchronous**: the core `fetchRound()` method chains futures via 
`CompletableFuture.thenCompose` — no blocking, no background thread, no local 
cache.
   - **Adaptive concurrency**: each round fires `min(ceil(remaining/32), 
maxInflight)` concurrent receives, evenly distributing the remaining count.
   - **Deadline from first message**: the `maxWaitTime` clock only starts after 
the first message arrives.
   - **Inflight requests always complete**: fired receives are never cancelled; 
the system waits for all to return before deciding on the next round.
   - **No excess messages**: the total count requested across concurrent 
receives equals exactly the remaining count.
   
   New interface methods:
   - `batchReceive(Duration)` / `batchReceiveAsync(Duration)` — round-robin 
across subscribed topics
   - `batchReceive(String topic, Duration)` / `batchReceiveAsync(String topic, 
Duration)` — topic-specific
   - `batchAck(List<MessageView>)` / `batchAckAsync(List<MessageView>)` — 
grouped by (topic, endpoints), chunked
   
   #### PushConsumer — Batch Consume Service
   
   - `BatchConsumeService`: accumulates messages from all `ProcessQueue`s into 
a single `BatchBuffer`, flushing to `BatchMessageListener` when any threshold 
is met.
   - `BatchBuffer`: encapsulated accumulation buffer with lock-based 
concurrency, scheduled timeout flush, forward-progress guarantee.
   - `BatchConsumeTask`: `Callable` that invokes `BatchMessageListener` with 
interceptor hooks.
   - New `BatchMessageListener` interface and 
`PushConsumerBuilder.setBatchPolicy()/setBatchMessageListener()`.
   
   #### BatchPolicy
   
   Shared policy with `maxBatchCount`, `maxBatchBytes`, `maxWaitTime`, 
`maxInflight` (SimpleConsumer only). Builder pattern with sensible defaults.
   
   ### How Did You Test This Change?
   
   - **BatchingSimpleConsumerImplTest**: 16 unit tests covering 
count/bytes/timeout triggers, adaptive multi-round fetch, maxInflight cap, 
topic-specific batch, inflight-always-complete semantics, error handling, 
direct delegation, no-excess-messages, concurrent calls, close behavior, 
effective invisible duration.
   - **BatchConsumeServiceTest**: 10 unit tests covering single/multi-batch 
flush, timeout flush, close flush, corrupted message discard, interleaved 
ProcessQueues, batch consumption callback.
   - **BatchConsumeTaskTest**: 4 unit tests covering 
success/failure/exception/null-return scenarios with interceptor hooks.
   - **ConsumerImplTest**: tests for batchAck grouping by (topic, endpoints) 
and chunking logic.
   - All tests pass: `mvn test -pl client -Dcheckstyle.skip=true` — 0 failures.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to