This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new cc185904 Implement ReconsumeLater on regex consumer (#1456)
cc185904 is described below
commit cc185904de7b42dc97193188874ce6605ff2d560
Author: Jordan Fitzgerald <[email protected]>
AuthorDate: Sun Jan 18 04:37:32 2026 -0700
Implement ReconsumeLater on regex consumer (#1456)
---
pulsar/consumer_multitopic.go | 56 +++++++++++++++++++++--------------
pulsar/consumer_regex.go | 20 ++++++++++---
pulsar/consumer_regex_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++
3 files changed, 118 insertions(+), 26 deletions(-)
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 7f5e4627..3e4c8ebd 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -30,6 +30,39 @@ import (
"github.com/apache/pulsar-client-go/pulsar/log"
)
+func reconsumeLaterWithMultipleTopics(
+ consumers map[string]Consumer,
+ log log.Logger,
+ msg Message,
+ customProperties map[string]string,
+ delay time.Duration,
+) {
+ names, err := validateTopicNames(msg.Topic())
+ if err != nil {
+ log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err)
+ return
+ }
+ if len(names) != 1 {
+ log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(),
names)
+ return
+ }
+
+ tn := names[0]
+ fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
+
+ consumer, ok := consumers[fqdnTopic]
+ if !ok {
+ // check to see if the topic with the partition part is in the
consumers
+ // this can happen when the consumer is configured to consume
from a specific partition
+ if consumer, ok = consumers[tn.Name]; !ok {
+ log.Warnf("consumer of topic %s not exist
unexpectedly", msg.Topic())
+ return
+ }
+ }
+
+ consumer.ReconsumeLaterWithCustomProperties(msg, customProperties,
delay)
+}
+
type multiTopicConsumer struct {
client *client
@@ -259,28 +292,7 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message,
delay time.Duration) {
func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message,
customProperties map[string]string,
delay time.Duration) {
- names, err := validateTopicNames(msg.Topic())
- if err != nil {
- c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(),
err)
- return
- }
- if len(names) != 1 {
- c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(),
names)
- return
- }
-
- tn := names[0]
- fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
- consumer, ok := c.consumers[fqdnTopic]
- if !ok {
- // check to see if the topic with the partition part is in the
consumers
- // this can happen when the consumer is configured to consume
from a specific partition
- if consumer, ok = c.consumers[tn.Name]; !ok {
- c.log.Warnf("consumer of topic %s not exist
unexpectedly", msg.Topic())
- return
- }
- }
- consumer.ReconsumeLaterWithCustomProperties(msg, customProperties,
delay)
+ reconsumeLaterWithMultipleTopics(c.consumers, c.log, msg,
customProperties, delay)
}
func (c *multiTopicConsumer) Nack(msg Message) {
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ced770e9..2548eaa7 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"regexp"
+ "slices"
"strings"
"sync"
"time"
@@ -186,12 +187,15 @@ func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}
-func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
- c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+ c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _
map[string]string, _ time.Duration) {
- c.log.Warnf("regexp consumer not support
ReconsumeLaterWithCustomProperties yet.")
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message,
customProperties map[string]string,
+ delay time.Duration) {
+ c.consumersLock.Lock()
+ defer c.consumersLock.Unlock()
+ reconsumeLaterWithMultipleTopics(c.consumers, c.log, msg,
customProperties, delay)
}
// AckID the consumption of a single message, identified by its MessageID
@@ -454,6 +458,14 @@ func (c *regexConsumer) topics() ([]string, error) {
}
filtered := filterTopics(topics, c.pattern)
+
+ if c.options.RetryEnable && c.options.DLQ != nil {
+ retryTopic := c.options.DLQ.RetryLetterTopic
+ if retryTopic != "" && !slices.Contains(filtered, retryTopic) {
+ filtered = append(filtered, retryTopic)
+ }
+ }
+
return filtered, nil
}
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index e1e2ca29..35ba1c28 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
@@ -513,3 +514,70 @@ func TestRegexTopicGetLastMessageIDs(t *testing.T) {
assert.Equal(t, 1, len(messages))
}
}
+
+func TestRegexConsumerReconsumeLater(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic1 := fmt.Sprintf("regex-reconsume-topic-%v", uuid.NewString())
+ assert.Nil(t, createPartitionedTopic(topic1, 1))
+
+ topic2 := fmt.Sprintf("regex-reconsume-topic-%v", uuid.NewString())
+ assert.Nil(t, createPartitionedTopic(topic2, 1))
+
+ topics := []string{topic1, topic2}
+
+ // create consumer
+ topicsPattern := "persistent://public/default/regex-reconsume-topic-.*"
+ consumer, err := client.Subscribe(ConsumerOptions{
+ TopicsPattern: topicsPattern,
+ SubscriptionName: "my-sub",
+ Type: Shared,
+ RetryEnable: true,
+ DLQ: &DLQPolicy{
+ MaxDeliveries: 2,
+ },
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // produce messages
+ for i, topic := range topics {
+ p, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ if !assert.Nil(t, err) {
+ t.Fatal()
+ }
+
+ err = genMessages(p, 1, func(_ int) string {
+ return fmt.Sprintf("topic-%d-hello", i+1)
+ })
+ assert.Nil(t, err)
+
+ p.Close()
+ }
+
+ // we should receive one message on each topic, then one retry for each
of those
+ for range 2 * len(topics) {
+ func() {
+ ctx, cancel :=
context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ msg, err := consumer.Receive(ctx)
+ if !assert.Nil(t, err) {
+ t.Fatal()
+ }
+
+ if strings.HasSuffix(msg.Topic(), RetryTopicSuffix) {
+ assert.Nil(t, consumer.Ack(msg))
+ } else {
+ consumer.ReconsumeLater(msg, time.Second)
+ }
+ }()
+ }
+}