RobertIndie opened a new issue, #21838:
URL: https://github.com/apache/pulsar/issues/21838

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   b944f10d2a9bac2fbe19dc83bf250a276f1ed4d1
   
   ### Minimal reproduce step
   
   Here is the reproduction code:
   ```java
   PulsarAdmin admin = buildAdminClient();
           String topic = "test";
           admin.topics().createPartitionedTopic(topic, 1);
   
           Consumer<byte[]> consumer = pulsarClient.newConsumer()
                   .topic(topic)
                   .subscriptionName("sub")
                   .subscriptionType(SubscriptionType.Shared)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .maxTotalReceiverQueueSizeAcrossPartitions(10)
                   .receiverQueueSize(10)
                   .consumerName("A")
                   .subscribe();
   
           Producer<byte[]> producer = pulsarClient.newProducer()
                   .topic(topic)
                   .enableBatching(false)
                   .create();
   
           for (int i = 0; i < 20; i++) {
               producer.send(("test" + i).getBytes());
           }
   
           Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
           log.info("Received message: {}", msg.getMessageId());
   
           Thread.sleep(1000);
   
           Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
                   .topic(topic)
                   .subscriptionName("sub")
                   .subscriptionType(SubscriptionType.Shared)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .maxTotalReceiverQueueSizeAcrossPartitions(10)
                   .receiverQueueSize(10)
                   .consumerName("B")
                   .subscribe();
   
           msg = consumer2.receive(1000, TimeUnit.MILLISECONDS);
           Assert.assertNotNull(msg);
           log.info("Received message: {}", msg.getMessageId());
   ```
   
   ### What did you expect to see?
   
   According to the documentation for the  
`maxTotalReceiverQueueSizeAcrossPartitions`:
   
   > The purpose of this setting is to have an upper-limit on the number
        of messages that a consumer can be pushed at once from a broker, across 
all
        the partitions.
   
   From my understandings, `maxTotalReceiverQueueSizeAcrossPartitions` should 
be able to control the max number of all cached messages in the consumer. 
Therefore, looking at the above code, the consumer A should only be able to 
cache a maximum of 10 messages. The consumer B should be able to receive a new 
message.
   
   
   
   ### What did you see instead?
   
   But consumer A has actually cached all 20 messages in the receiver 
queue(receiverQueueSize+maxTotalReceiverQueueSizeAcrossPartitions). Consumer B 
can't receive the messages. So the above test would fail.
   
   ### Anything else?
   
   From the implementation, the maximum number of messages actually cached by 
multi-topics consumer is approximately 
receiverQueueSize+maxTotalReceiverQueueSizeAcrossPartitions.This seems to go 
against the case described in the documentation.
   
   
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to