GitHub user Debashish-Mallick edited a discussion: Regarding receiverQueueSize
and mutitopic consumer behavior
Hi Everyone,
Pulsar allowed receiveQ size as 0 for a single topic consumer .
We have the following scenario.
Assume that we have 5 topics. We want to have a single Shared Subscription
consumer across the 5 topics to consume the message in round robin fashion
across the 5 topics. We do not want the consumer to hold the message in receive
queue. The consumer needs to pick the message only it intends to handle and
handle over the message to some other process. In my case, the number of topics
5 is not constant. It can keep changing. To handle the above situation with
multi-topic consumer, when a new topic gets added, I am trying to close the
existing consumer and create a new consumer with additional topics. Also,
please note that the topics will be across the pulsar tenants (i.e, the topics
will not be from same single pulsar tenant).
I am trying to use multitopic consumer and setting receiverQueueSize as 1 (as
this queue size should be >0). then after some time I am creating one more new
consumer with the newly added topic from different tenant, and started
receiving message from newly created consumer .
Below are the observations , need to know weather this is valid behavior in
pulsar.
With the new consumer messages are receiving properly for newly added topic.
But for old consumer we have 3 messages left ( checked availablePermits for old
consumer , It was showing 0) ,
and once we close the old consumer , these 3 messages started consuming in new
consumer with the same message id with the redelivery count as 1.
Tried with new pulsar client and new consumer name while creation of new
consumer , behavior is same.
`allTopicsConsumerOld = pulsarClient.newConsumer()
.topic(**"pulsartenant_1/default/test"**) // this is for multiconsumer
.consumerName("my-consumer")
.subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(1)
.subscribe();
allTopicsConsumerNew = pulsarClient.newConsumer()
.topic(**"pulsartenant_1/default/test","pulsartenant_2/default/test"**)
// this is for multiconsumer
.consumerName("my-consumer")
.subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(1)
.subscribe();`
GitHub link: https://github.com/apache/pulsar/discussions/22317
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]