This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new fe335ded [feat] Add consumer PriorityLevel support (#1487)
fe335ded is described below
commit fe335dedc800c6b8e10ba4462a88c6ec6050ad69
Author: grishaf <[email protected]>
AuthorDate: Mon May 11 13:16:45 2026 +0300
[feat] Add consumer PriorityLevel support (#1487)
---
pulsar/consumer.go | 20 ++++
pulsar/consumer_impl.go | 6 ++
pulsar/consumer_partition.go | 3 +-
pulsar/consumer_test.go | 211 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 239 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 7996335a..f7005543 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -132,6 +132,26 @@ type ConsumerOptions struct {
// This argument is required when subscribing
SubscriptionName string
+ // PriorityLevel sets the priority level for this consumer.
+ //
+ // Shared subscription:
+ // Sets priority level for the consumer to determine which consumers
the broker
+ // prioritizes when dispatching messages. The broker follows descending
priorities
+ // (0 = max priority, 1, 2, ...).
+ // The broker first dispatches messages to max priority consumers if
they have
+ // permits, otherwise considers next priority level consumers.
+ // e.g. if consumer-A has priorityLevel 0 and consumer-B has
priorityLevel 1,
+ // the broker dispatches messages to only consumer-A until it runs out
of permits,
+ // then starts dispatching to consumer-B.
+ //
+ // Failover subscription (partitioned topics only):
+ // The broker selects the active consumer based on priority level and
lexicographic
+ // sorting of consumer name. Higher priority (lower number) consumers
are preferred.
+ // Priority level has no effect on failover subscriptions for
non-partitioned topics.
+ //
+ // Default is 0 (max priority).
+ PriorityLevel int
+
// Properties represents a set of application defined properties for
the consumer.
// Those properties will be visible in the topic stats
Properties map[string]string
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 08c825e3..ece37370 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
+ "math"
"math/rand"
"strconv"
"sync"
@@ -84,6 +85,10 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
return nil, newError(SubscriptionNotFound, "subscription name
is required for consumer")
}
+ if options.PriorityLevel < 0 || options.PriorityLevel > math.MaxInt32 {
+ return nil, newError(InvalidConfiguration, "priority level must
be >= 0 and <= math.MaxInt32")
+ }
+
if options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = defaultReceiverQueueSize
}
@@ -455,6 +460,7 @@ func newPartitionConsumerOpts(topic, consumerName string,
idx int, options Consu
subscriptionType: options.Type,
subscriptionInitPos:
options.SubscriptionInitialPosition,
partitionIdx: idx,
+ priorityLevel: options.PriorityLevel,
receiverQueueSize: options.ReceiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: options.NackBackoffPolicy,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e63d4b39..8c6d8916 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -100,6 +100,7 @@ type partitionConsumerOpts struct {
subscriptionType SubscriptionType
subscriptionInitPos SubscriptionInitialPosition
partitionIdx int
+ priorityLevel int
receiverQueueSize int
autoReceiverQueueSize bool
nackRedeliveryDelay time.Duration
@@ -2165,7 +2166,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
ConsumerName: proto.String(pc.name),
- PriorityLevel: nil,
+ PriorityLevel:
proto.Int32(int32(pc.options.priorityLevel)),
Durable:
proto.Bool(pc.options.subscriptionMode == Durable),
Metadata:
internal.ConvertFromStringMap(pc.options.metadata),
SubscriptionProperties:
internal.ConvertFromStringMap(pc.options.subProperties),
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0d710ead..4e232b8a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -228,6 +228,217 @@ func TestConsumerWithInvalidConf(t *testing.T) {
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
+func TestConsumerWithInvalidPriorityLevel(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-topic",
+ SubscriptionName: "my-sub",
+ PriorityLevel: -1,
+ })
+
+ assert.Nil(t, consumer)
+ assert.NotNil(t, err)
+ assert.Equal(t, err.(*Error).Result(), InvalidConfiguration)
+}
+
+func TestPriorityConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ sub := "sub-shared-priority"
+
+ // High-priority consumers (priority 1)
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // Low-priority consumer (priority 2)
+ consumer3, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ Type: Shared,
+ ReceiverQueueSize: 5,
+ PriorityLevel: 2,
+ })
+ assert.Nil(t, err)
+ defer consumer3.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ // Drain permits from consumer1 and consumer2
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ msg, err := consumer1.Receive(ctx)
+ cancel()
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ }
+ for i := 0; i < 5; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ msg, err := consumer2.Receive(ctx)
+ cancel()
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ }
+
+ // Low-priority consumer should not have received any messages
+ ctx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ msg, err := consumer3.Receive(ctx)
+ cancel()
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+}
+
+func TestFailOverConsumerPriority(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/" +
randomName + "/partitions"
+ makeHTTPCall(t, http.MethodPut, testURL, "9")
+
+ sub := "my-sub"
+
+ // C1 at priority 1
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "aaa",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer1.Close()
+
+ // C2 at priority 0 — should take over from C1
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb1",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 0,
+ })
+ assert.Nil(t, err)
+ defer consumer2.Close()
+
+ // C3 at priority 0
+ consumer3, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb2",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 0,
+ })
+ assert.Nil(t, err)
+ defer consumer3.Close()
+
+ // C4 at priority 0
+ consumer4, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb3",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 0,
+ })
+ assert.Nil(t, err)
+ defer consumer4.Close()
+
+ // C5 at priority 1 — should not get any partitions
+ consumer5, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: "bbb4",
+ SubscriptionName: sub,
+ Type: Failover,
+ PriorityLevel: 1,
+ })
+ assert.Nil(t, err)
+ defer consumer5.Close()
+
+ evenDistribution := 9 / 3 // 3 partitions per priority-0 consumer
+
+ topicName, err := utils.GetTopicName(topic)
+ assert.Nil(t, err)
+
+ cfg := &config.Config{}
+ pulsarAdmin, err := pulsaradmin.NewClient(cfg)
+ assert.NoError(t, err)
+
+ // Poll admin stats until partitions are evenly distributed among
priority-0 consumers
+ retryAssert(t, 20, 500, func() {}, func(_ assert.TestingT) bool {
+ stats, err :=
pulsarAdmin.Topics().GetPartitionedStats(*topicName, true)
+ if err != nil {
+ return false
+ }
+ counts := map[string]int{}
+ for _, pStats := range stats.Partitions {
+ subStats, ok := pStats.Subscriptions[sub]
+ if !ok {
+ return false
+ }
+ counts[subStats.ActiveConsumerName]++
+ }
+ return len(counts) == 3 &&
+ counts["bbb1"] == evenDistribution &&
+ counts["bbb2"] == evenDistribution &&
+ counts["bbb3"] == evenDistribution
+ })
+
+ // Final assertion with real test failure
+ stats, err := pulsarAdmin.Topics().GetPartitionedStats(*topicName, true)
+ assert.Nil(t, err)
+ counts := map[string]int{}
+ for _, pStats := range stats.Partitions {
+ subStats := pStats.Subscriptions[sub]
+ counts[subStats.ActiveConsumerName]++
+ }
+ assert.Equal(t, 3, len(counts))
+ assert.Equal(t, evenDistribution, counts["bbb1"])
+ assert.Equal(t, evenDistribution, counts["bbb2"])
+ assert.Equal(t, evenDistribution, counts["bbb3"])
+}
+
func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,