GitHub user Debashish-Mallick edited a discussion: Regarding receiverQueueSize 
and mutitopic consumer behavior

Hi Everyone,

Pulsar allowed receiveQ size as 0 for a single topic consumer .

We have the following scenario.
Assume that we have 5 topics. We want to have a single Shared Subscription 
consumer across the 5 topics to consume the message in round robin fashion 
across the 5 topics. We do not want the consumer to hold the message in receive 
queue. The consumer needs to pick the message only it intends to handle and 
handle over the message to some other process. In my case, the number of topics 
5 is not constant. It can keep changing. To handle the above situation with 
multi-topic consumer, when a new topic gets added, I am trying to close the 
existing consumer and create a new consumer with additional topics. Also, 
please note that the topics will be across the pulsar tenants (i.e, the topics 
will not be from same single pulsar tenant).


I am trying to use multitopic consumer and setting receiverQueueSize as 1 (as 
this queue size should be >0). then after some time I am creating one more new 
consumer with the newly added topic from different tenant, and started 
receiving message from newly created consumer .
Below are the observations , need to know weather this is valid behavior in 
pulsar.

With the new consumer messages are receiving properly for newly added topic.
But for old consumer we have 3 messages left ( checked availablePermits for old 
consumer , It was showing 0) , 
and once we close the old consumer , these 3 messages started consuming in new 
consumer with the same message id with the redelivery count as 1.

Tried with new pulsar client and new consumer name while creation of new 
consumer , behavior is same.

`allTopicsConsumerOld = pulsarClient.newConsumer()
        .topic(**"pulsartenant_1/default/test"**) // this is for multiconsumer
        .consumerName("my-consumer")
        .subscriptionName("my-sub")
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .receiverQueueSize(1)
        .subscribe();
allTopicsConsumerNew = pulsarClient.newConsumer()
        .topic(**"pulsartenant_1/default/test","pulsartenant_2/default/test"**) 
// this is for multiconsumer
        .consumerName("my-consumer")
        .subscriptionName("my-sub")
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .receiverQueueSize(1)
        .subscribe();`

GitHub link: https://github.com/apache/pulsar/discussions/22317

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to