This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fe335ded [feat] Add consumer PriorityLevel support (#1487)
fe335ded is described below

commit fe335dedc800c6b8e10ba4462a88c6ec6050ad69
Author: grishaf <[email protected]>
AuthorDate: Mon May 11 13:16:45 2026 +0300

    [feat] Add consumer PriorityLevel support (#1487)
---
 pulsar/consumer.go           |  20 ++++
 pulsar/consumer_impl.go      |   6 ++
 pulsar/consumer_partition.go |   3 +-
 pulsar/consumer_test.go      | 211 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 239 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 7996335a..f7005543 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -132,6 +132,26 @@ type ConsumerOptions struct {
        // This argument is required when subscribing
        SubscriptionName string
 
+       // PriorityLevel sets the priority level for this consumer.
+       //
+       // Shared subscription:
+       // Sets priority level for the consumer to determine which consumers 
the broker
+       // prioritizes when dispatching messages. The broker follows descending 
priorities
+       // (0 = max priority, 1, 2, ...).
+       // The broker first dispatches messages to max priority consumers if 
they have
+       // permits, otherwise considers next priority level consumers.
+       // e.g. if consumer-A has priorityLevel 0 and consumer-B has 
priorityLevel 1,
+       // the broker dispatches messages to only consumer-A until it runs out 
of permits,
+       // then starts dispatching to consumer-B.
+       //
+       // Failover subscription (partitioned topics only):
+       // The broker selects the active consumer based on priority level and 
lexicographic
+       // sorting of consumer name. Higher priority (lower number) consumers 
are preferred.
+       // Priority level has no effect on failover subscriptions for 
non-partitioned topics.
+       //
+       // Default is 0 (max priority).
+       PriorityLevel int
+
        // Properties represents a set of application defined properties for 
the consumer.
        // Those properties will be visible in the topic stats
        Properties map[string]string
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 08c825e3..ece37370 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -21,6 +21,7 @@ import (
        "context"
        "errors"
        "fmt"
+       "math"
        "math/rand"
        "strconv"
        "sync"
@@ -84,6 +85,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                return nil, newError(SubscriptionNotFound, "subscription name 
is required for consumer")
        }
 
+       if options.PriorityLevel < 0 || options.PriorityLevel > math.MaxInt32 {
+               return nil, newError(InvalidConfiguration, "priority level must 
be >= 0 and <= math.MaxInt32")
+       }
+
        if options.ReceiverQueueSize <= 0 {
                options.ReceiverQueueSize = defaultReceiverQueueSize
        }
@@ -455,6 +460,7 @@ func newPartitionConsumerOpts(topic, consumerName string, 
idx int, options Consu
                subscriptionType:            options.Type,
                subscriptionInitPos:         
options.SubscriptionInitialPosition,
                partitionIdx:                idx,
+               priorityLevel:               options.PriorityLevel,
                receiverQueueSize:           options.ReceiverQueueSize,
                nackRedeliveryDelay:         nackRedeliveryDelay,
                nackBackoffPolicy:           options.NackBackoffPolicy,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e63d4b39..8c6d8916 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -100,6 +100,7 @@ type partitionConsumerOpts struct {
        subscriptionType            SubscriptionType
        subscriptionInitPos         SubscriptionInitialPosition
        partitionIdx                int
+       priorityLevel               int
        receiverQueueSize           int
        autoReceiverQueueSize       bool
        nackRedeliveryDelay         time.Duration
@@ -2165,7 +2166,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                ConsumerId:                 proto.Uint64(pc.consumerID),
                RequestId:                  proto.Uint64(requestID),
                ConsumerName:               proto.String(pc.name),
-               PriorityLevel:              nil,
+               PriorityLevel:              
proto.Int32(int32(pc.options.priorityLevel)),
                Durable:                    
proto.Bool(pc.options.subscriptionMode == Durable),
                Metadata:                   
internal.ConvertFromStringMap(pc.options.metadata),
                SubscriptionProperties:     
internal.ConvertFromStringMap(pc.options.subProperties),
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0d710ead..4e232b8a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -228,6 +228,217 @@ func TestConsumerWithInvalidConf(t *testing.T) {
        assert.Equal(t, err.(*Error).Result(), TopicNotFound)
 }
 
+func TestConsumerWithInvalidPriorityLevel(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            "my-topic",
+               SubscriptionName: "my-sub",
+               PriorityLevel:    -1,
+       })
+
+       assert.Nil(t, consumer)
+       assert.NotNil(t, err)
+       assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
+}
+
+func TestPriorityConsumer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       sub := "sub-shared-priority"
+
+       // High-priority consumers (priority 1)
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:             topic,
+               SubscriptionName:  sub,
+               Type:              Shared,
+               ReceiverQueueSize: 5,
+               PriorityLevel:     1,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:             topic,
+               SubscriptionName:  sub,
+               Type:              Shared,
+               ReceiverQueueSize: 5,
+               PriorityLevel:     1,
+       })
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       // Low-priority consumer (priority 2)
+       consumer3, err := client.Subscribe(ConsumerOptions{
+               Topic:             topic,
+               SubscriptionName:  sub,
+               Type:              Shared,
+               ReceiverQueueSize: 5,
+               PriorityLevel:     2,
+       })
+       assert.Nil(t, err)
+       defer consumer3.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       // Drain permits from consumer1 and consumer2
+       for i := 0; i < 5; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
2*time.Second)
+               msg, err := consumer1.Receive(ctx)
+               cancel()
+               assert.Nil(t, err)
+               assert.NotNil(t, msg)
+       }
+       for i := 0; i < 5; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
2*time.Second)
+               msg, err := consumer2.Receive(ctx)
+               cancel()
+               assert.Nil(t, err)
+               assert.NotNil(t, msg)
+       }
+
+       // Low-priority consumer should not have received any messages
+       ctx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       msg, err := consumer3.Receive(ctx)
+       cancel()
+       assert.NotNil(t, err)
+       assert.Nil(t, msg)
+}
+
+func TestFailOverConsumerPriority(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + 
randomName + "/partitions"
+       makeHTTPCall(t, http.MethodPut, testURL, "9")
+
+       sub := "my-sub"
+
+       // C1 at priority 1
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Name:             "aaa",
+               SubscriptionName: sub,
+               Type:             Failover,
+               PriorityLevel:    1,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       // C2 at priority 0 — should take over from C1
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Name:             "bbb1",
+               SubscriptionName: sub,
+               Type:             Failover,
+               PriorityLevel:    0,
+       })
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       // C3 at priority 0
+       consumer3, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Name:             "bbb2",
+               SubscriptionName: sub,
+               Type:             Failover,
+               PriorityLevel:    0,
+       })
+       assert.Nil(t, err)
+       defer consumer3.Close()
+
+       // C4 at priority 0
+       consumer4, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Name:             "bbb3",
+               SubscriptionName: sub,
+               Type:             Failover,
+               PriorityLevel:    0,
+       })
+       assert.Nil(t, err)
+       defer consumer4.Close()
+
+       // C5 at priority 1 — should not get any partitions
+       consumer5, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               Name:             "bbb4",
+               SubscriptionName: sub,
+               Type:             Failover,
+               PriorityLevel:    1,
+       })
+       assert.Nil(t, err)
+       defer consumer5.Close()
+
+       evenDistribution := 9 / 3 // 3 partitions per priority-0 consumer
+
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+
+       cfg := &config.Config{}
+       pulsarAdmin, err := pulsaradmin.NewClient(cfg)
+       assert.NoError(t, err)
+
+       // Poll admin stats until partitions are evenly distributed among 
priority-0 consumers
+       retryAssert(t, 20, 500, func() {}, func(_ assert.TestingT) bool {
+               stats, err := 
pulsarAdmin.Topics().GetPartitionedStats(*topicName, true)
+               if err != nil {
+                       return false
+               }
+               counts := map[string]int{}
+               for _, pStats := range stats.Partitions {
+                       subStats, ok := pStats.Subscriptions[sub]
+                       if !ok {
+                               return false
+                       }
+                       counts[subStats.ActiveConsumerName]++
+               }
+               return len(counts) == 3 &&
+                       counts["bbb1"] == evenDistribution &&
+                       counts["bbb2"] == evenDistribution &&
+                       counts["bbb3"] == evenDistribution
+       })
+
+       // Final assertion with real test failure
+       stats, err := pulsarAdmin.Topics().GetPartitionedStats(*topicName, true)
+       assert.Nil(t, err)
+       counts := map[string]int{}
+       for _, pStats := range stats.Partitions {
+               subStats := pStats.Subscriptions[sub]
+               counts[subStats.ActiveConsumerName]++
+       }
+       assert.Equal(t, 3, len(counts))
+       assert.Equal(t, evenDistribution, counts["bbb1"])
+       assert.Equal(t, evenDistribution, counts["bbb2"])
+       assert.Equal(t, evenDistribution, counts["bbb3"])
+}
+
 func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to