qianye1001 opened a new pull request, #1249:
URL: https://github.com/apache/rocketmq-clients/pull/1249

   ## Summary
   
   Add batch message consumption capability to the Java PushConsumer, allowing 
messages to be accumulated in a buffer and delivered in batches based on 
configurable policies.
   
   - **BatchMessageListener** — new public API interface that receives 
`List<MessageView>` and returns a single `ConsumeResult` for the entire batch
   - **BatchPolicy** — configurable batching strategy with Builder pattern and 
sensible defaults (32 msgs, 4MB, 5s). Flush triggers on any of: maxBatchSize, 
maxBatchBytes, maxWaitTime
   - **BatchConsumeService** — Standard mode implementation with precise batch 
splitting (`extractBatch` respects both count and bytes limits simultaneously) 
and forward-progress guarantee (oversized single message still flushes)
   - **FifoBatchConsumeService** — FIFO mode via inheritance with whole-batch 
retry semantics and single-batch-in-flight constraint. On failure, entire batch 
is retried until max attempts exhausted, then forwarded to DLQ
   - **ConsumeService** enhanced — added `close()` lifecycle method, protected 
constructor for batch services (no NOOP listener hack), and accessor methods
   - **Config validation** in builder: `maxCacheMessageCount >= maxBatchSize`, 
`maxCacheMessageSizeInBytes >= maxBatchBytes`
   - **Comprehensive tests** using awaitility — 34 new test cases covering 
Standard/FIFO modes, retry, DLQ, concurrency, forward-progress, corrupted 
message handling, Builder validation
   - **BatchPushConsumerExample** demonstrating usage
   
   ### Key Design Decisions
   
   | Decision | Choice | Rationale |
   |----------|--------|-----------|
   | Batch result type | Single `ConsumeResult` per batch | Simple API; entire 
batch succeeds or fails together |
   | Buffer scope | Global (mixed across ProcessQueues) | No PQ-based grouping; 
messages from all sources batch together |
   | FIFO retry | Whole-batch retry | Preserves ordering guarantee; only one 
batch in-flight at a time |
   | Class hierarchy | `FifoBatchConsumeService extends BatchConsumeService` | 
Clean separation via inheritance instead of if-else flag |
   | Oversized messages | Forward-progress guarantee | Single message > 
maxBatchBytes still flushes as batch-of-one |
   
   ## Test plan
   
   - [x] `BatchPolicyTest` — 15 cases: constructor validation, Builder 
defaults, Builder custom values, invalid params
   - [x] `BatchConsumeTaskTest` — 6 cases: success, failure, exception, null 
return, unmodifiable list, batch size
   - [x] `BatchConsumeServiceTest` — 13 cases: maxBatchSize flush, 
maxBatchBytes flush, maxWaitTime flush, Standard failure, FIFO success, FIFO 
retry+DLQ, FIFO single-batch-in-flight, graceful shutdown, concurrent PQ 
submission, corrupted message discard, precise batch splitting, 
forward-progress, FIFO corrupted discard
   - [x] Full `mvn -B package` passes locally (JDK 11, 223 tests, 0 failures)
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to