3pacccccc commented on PR #24610:
URL: https://github.com/apache/pulsar/pull/24610#issuecomment-3160671052

   Thanks for your review @lhotari . Here's my answer to your questions:
   > 1.Please elaborate more on why you think that this contradicts expected 
behavior of zero-queue consumers?
   
   The current behavior with MessagePayloadProcessor creates an unpredictable 
interaction pattern. Here's a detailed breakdown of what happens:
   ```java
   Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
           .batchingMaxMessages(5)
           .enableBatching(true)
           .create();
   
   ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer()
           .topic(topicName)
           .messagePayloadProcessor(new CustomProcessor())
           .subscriptionName(subscriptionName)
           .receiverQueueSize(0)
           .subscribe();
   
   for (int i = 0; i < totalMessages; i++) {
       String message = messagePredicate + i;
       producer.sendAsync(message.getBytes());
   }
   producer.flush();
   for (int i = 0; i < 10; i++) {
       Message<byte[]> receive = consumer.receive();
   }
   ```
   
   In this case, here's the step of what will happen:
   
   | consumer                                                                   
                                                                                
                                                                                
                                            | broker                            
                                                  |
   
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------|
   | receive()  called                                                          
                                                                                
                                                                                
                                            |                                   
                                                  |
   | send 1 flow-permits to broker<sup>[1]</sup>                                
                                                                                
                                                                                
                                            |                                   
                                                  |
   | Blocks waiting for messages                                                
                                                                                
                                                                                
                                            |                                   
                                                  |
   |                                                                            
                                                                                
                                                                                
                                            | receive flow-permits, and send 5 
messages to broker, set ```messagePermits``` as -4 |
   | receive 5 messages in a batch                                              
                                                                                
                                                                                
                                            |                                   
                                                  |
   | processed batch via ```processPayloadByProcessor```<sup>[2]</sup>          
                                                                                
                                                                                
                                            |                                   
                                                  |
   | put messages into ```incomingMessages``` queue                             
                                                                                
                                                                                
                                            |                                   
                                                  |
   | consume 1 message.                                                         
                                                                                
                                                                                
                                            |                                   
                                                  |
   | trigger one more receive()                                                 
                                                                                
                                                                                
                                            |                                   
                                                  |
   | find  messages in the incomingMessages, maybe 1 message or more, maybe no 
message, totally depends on the process speed of user's customize 
```processPayloadByProcessor```<sup>[2]</sup>, let's assume there's 4 message's 
in the ```incomingQueue```, release them all<sup>[1]</sup> |                    
                                                                 |
   | send 1 flow-permits to broker                                              
                                                                                
                                                                                
                                            |                                   
                                                  |
   | block and waiting for new messages coming                                  
                                                                                
                                                                                
                                            |                                   
                                                  |
   |                                                                            
                                                                                
                                                                                
                                            | receive flow-permits, don't send 
message, set ```messagePermits``` as -3            |
   
   
[1]:https://github.com/apache/pulsar/blob/994b1bea110ad1fd284c87d6fee3a3ecfa4ea010/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L96
   
[2]:https://github.com/apache/pulsar/blob/885bf8707e76d1f7d0a25bb5e5f899ed57e1ff32/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1387
   so, as you can see, this ```zeroQueueConsumer``` will be blocked 
indefinitely in this case after consume 1 message. and maybe after consume 2 
messages or more, and even won't get blocked, this totally depends on the 
process speed of user's customize ```MessagePayloadProcessor```.
   
   **Key Problems**:
   - Permit Mismatch: The broker sends more messages (batches) than requested
   
   - Hidden Buffering: Messages accumulate in incomingMessages despite 
queueSize=0
   
   - Unpredictable Blocking: Behavior depends on processor speed, violating 
zero-queue guarantees
   
   > 2.What do you consider as "buffering"?
   
   In my understanding, the fundamental design principle of 
```ZeroQueueConsumer``` (receiverQueueSize=0) is to be a **strictly 
non-buffering consumer** that:
   
   - Operates in pure request-response mode:
   
     - Each receive() call should trigger exactly one network request to the 
broker
   
     -  No messages should be retained client-side between receive() calls
   > 3.What is your use case?
   
   We need ZeroQueueConsumer to:
   
   1.**Maintain Strict Pull Semantics**: Each receive() should trigger exactly 
one broker interaction
   
   2.**Handle Batches Predictably**: Either:Reject batches entirely, or Process 
them atomically without intermediate buffering


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to