qianye1001 opened a new pull request, #1226: URL: https://github.com/apache/rocketmq-clients/pull/1226
### Which Issue(s) This PR Fixes Fixes #1225 ### Brief Description This PR adds first-class batch support to both **SimpleConsumer** and **PushConsumer** in the Java client, controlled by a shared `BatchPolicy`. #### SimpleConsumer — Adaptive Batch Receive `BatchingSimpleConsumerImpl` is a `SimpleConsumer` decorator that converts `batchReceive()` into adaptive concurrent `receiveAsync()` calls: - **Fully asynchronous**: the core `fetchRound()` method chains futures via `CompletableFuture.thenCompose` — no blocking, no background thread, no local cache. - **Adaptive concurrency**: each round fires `min(ceil(remaining/32), maxInflight)` concurrent receives, evenly distributing the remaining count. - **Deadline from first message**: the `maxWaitTime` clock only starts after the first message arrives. - **Inflight requests always complete**: fired receives are never cancelled; the system waits for all to return before deciding on the next round. - **No excess messages**: the total count requested across concurrent receives equals exactly the remaining count. New interface methods: - `batchReceive(Duration)` / `batchReceiveAsync(Duration)` — round-robin across subscribed topics - `batchReceive(String topic, Duration)` / `batchReceiveAsync(String topic, Duration)` — topic-specific - `batchAck(List<MessageView>)` / `batchAckAsync(List<MessageView>)` — grouped by (topic, endpoints), chunked #### PushConsumer — Batch Consume Service - `BatchConsumeService`: accumulates messages from all `ProcessQueue`s into a single `BatchBuffer`, flushing to `BatchMessageListener` when any threshold is met. - `BatchBuffer`: encapsulated accumulation buffer with lock-based concurrency, scheduled timeout flush, forward-progress guarantee. - `BatchConsumeTask`: `Callable` that invokes `BatchMessageListener` with interceptor hooks. - New `BatchMessageListener` interface and `PushConsumerBuilder.setBatchPolicy()/setBatchMessageListener()`. #### BatchPolicy Shared policy with `maxBatchCount`, `maxBatchBytes`, `maxWaitTime`, `maxInflight` (SimpleConsumer only). Builder pattern with sensible defaults. ### How Did You Test This Change? - **BatchingSimpleConsumerImplTest**: 16 unit tests covering count/bytes/timeout triggers, adaptive multi-round fetch, maxInflight cap, topic-specific batch, inflight-always-complete semantics, error handling, direct delegation, no-excess-messages, concurrent calls, close behavior, effective invisible duration. - **BatchConsumeServiceTest**: 10 unit tests covering single/multi-batch flush, timeout flush, close flush, corrupted message discard, interleaved ProcessQueues, batch consumption callback. - **BatchConsumeTaskTest**: 4 unit tests covering success/failure/exception/null-return scenarios with interceptor hooks. - **ConsumerImplTest**: tests for batchAck grouping by (topic, endpoints) and chunking logic. - All tests pass: `mvn test -pl client -Dcheckstyle.skip=true` — 0 failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
