3pacccccc commented on PR #24610: URL: https://github.com/apache/pulsar/pull/24610#issuecomment-3160671052
Thanks for your review @lhotari . Here's my answer to your questions:
> 1.Please elaborate more on why you think that this contradicts expected
behavior of zero-queue consumers?
The current behavior with MessagePayloadProcessor creates an unpredictable
interaction pattern. Here's a detailed breakdown of what happens:
```java
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxMessages(5)
.enableBatching(true)
.create();
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer()
.topic(topicName)
.messagePayloadProcessor(new CustomProcessor())
.subscriptionName(subscriptionName)
.receiverQueueSize(0)
.subscribe();
for (int i = 0; i < totalMessages; i++) {
String message = messagePredicate + i;
producer.sendAsync(message.getBytes());
}
producer.flush();
for (int i = 0; i < 10; i++) {
Message<byte[]> receive = consumer.receive();
}
```
In this case, here's the step of what will happen:
| consumer
| broker
|
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------|
| receive() called
|
|
| send 1 flow-permits to broker<sup>[1]</sup>
|
|
| Blocks waiting for messages
|
|
|
| receive flow-permits, and send 5
messages to broker, set ```messagePermits``` as -4 |
| receive 5 messages in a batch
|
|
| processed batch via ```processPayloadByProcessor```<sup>[2]</sup>
|
|
| put messages into ```incomingMessages``` queue
|
|
| consume 1 message.
|
|
| trigger one more receive()
|
|
| find messages in the incomingMessages, maybe 1 message or more, maybe no
message, totally depends on the process speed of user's customize
```processPayloadByProcessor```<sup>[2]</sup>, let's assume there's 4 message's
in the ```incomingQueue```, release them all<sup>[1]</sup> |
|
| send 1 flow-permits to broker
|
|
| block and waiting for new messages coming
|
|
|
| receive flow-permits, don't send
message, set ```messagePermits``` as -3 |
[1]:https://github.com/apache/pulsar/blob/994b1bea110ad1fd284c87d6fee3a3ecfa4ea010/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L96
[2]:https://github.com/apache/pulsar/blob/885bf8707e76d1f7d0a25bb5e5f899ed57e1ff32/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1387
so, as you can see, this ```zeroQueueConsumer``` will be blocked
indefinitely in this case after consume 1 message. and maybe after consume 2
messages or more, and even won't get blocked, this totally depends on the
process speed of user's customize ```MessagePayloadProcessor```.
**Key Problems**:
- Permit Mismatch: The broker sends more messages (batches) than requested
- Hidden Buffering: Messages accumulate in incomingMessages despite
queueSize=0
- Unpredictable Blocking: Behavior depends on processor speed, violating
zero-queue guarantees
> 2.What do you consider as "buffering"?
In my understanding, the fundamental design principle of
```ZeroQueueConsumer``` (receiverQueueSize=0) is to be a **strictly
non-buffering consumer** that:
- Operates in pure request-response mode:
- Each receive() call should trigger exactly one network request to the
broker
- No messages should be retained client-side between receive() calls
> 3.What is your use case?
We need ZeroQueueConsumer to:
1.**Maintain Strict Pull Semantics**: Each receive() should trigger exactly
one broker interaction
2.**Handle Batches Predictably**: Either:Reject batches entirely, or Process
them atomically without intermediate buffering
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
