This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cc185904 Implement ReconsumeLater on regex consumer (#1456)
cc185904 is described below

commit cc185904de7b42dc97193188874ce6605ff2d560
Author: Jordan Fitzgerald <[email protected]>
AuthorDate: Sun Jan 18 04:37:32 2026 -0700

    Implement ReconsumeLater on regex consumer (#1456)
---
 pulsar/consumer_multitopic.go | 56 +++++++++++++++++++++--------------
 pulsar/consumer_regex.go      | 20 ++++++++++---
 pulsar/consumer_regex_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 118 insertions(+), 26 deletions(-)

diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 7f5e4627..3e4c8ebd 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -30,6 +30,39 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
+func reconsumeLaterWithMultipleTopics(
+       consumers map[string]Consumer,
+       log log.Logger,
+       msg Message,
+       customProperties map[string]string,
+       delay time.Duration,
+) {
+       names, err := validateTopicNames(msg.Topic())
+       if err != nil {
+               log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err)
+               return
+       }
+       if len(names) != 1 {
+               log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), 
names)
+               return
+       }
+
+       tn := names[0]
+       fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
+
+       consumer, ok := consumers[fqdnTopic]
+       if !ok {
+               // check to see if the topic with the partition part is in the 
consumers
+               // this can happen when the consumer is configured to consume 
from a specific partition
+               if consumer, ok = consumers[tn.Name]; !ok {
+                       log.Warnf("consumer of topic %s not exist 
unexpectedly", msg.Topic())
+                       return
+               }
+       }
+
+       consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, 
delay)
+}
+
 type multiTopicConsumer struct {
        client *client
 
@@ -259,28 +292,7 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, 
delay time.Duration) {
 
 func (c *multiTopicConsumer) ReconsumeLaterWithCustomProperties(msg Message, 
customProperties map[string]string,
        delay time.Duration) {
-       names, err := validateTopicNames(msg.Topic())
-       if err != nil {
-               c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), 
err)
-               return
-       }
-       if len(names) != 1 {
-               c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), 
names)
-               return
-       }
-
-       tn := names[0]
-       fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
-       consumer, ok := c.consumers[fqdnTopic]
-       if !ok {
-               // check to see if the topic with the partition part is in the 
consumers
-               // this can happen when the consumer is configured to consume 
from a specific partition
-               if consumer, ok = c.consumers[tn.Name]; !ok {
-                       c.log.Warnf("consumer of topic %s not exist 
unexpectedly", msg.Topic())
-                       return
-               }
-       }
-       consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, 
delay)
+       reconsumeLaterWithMultipleTopics(c.consumers, c.log, msg, 
customProperties, delay)
 }
 
 func (c *multiTopicConsumer) Nack(msg Message) {
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ced770e9..2548eaa7 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "regexp"
+       "slices"
        "strings"
        "sync"
        "time"
@@ -186,12 +187,15 @@ func (c *regexConsumer) Ack(msg Message) error {
        return c.AckID(msg.ID())
 }
 
-func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
-       c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+       c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
 }
 
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ 
map[string]string, _ time.Duration) {
-       c.log.Warnf("regexp consumer not support 
ReconsumeLaterWithCustomProperties yet.")
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, 
customProperties map[string]string,
+       delay time.Duration) {
+       c.consumersLock.Lock()
+       defer c.consumersLock.Unlock()
+       reconsumeLaterWithMultipleTopics(c.consumers, c.log, msg, 
customProperties, delay)
 }
 
 // AckID the consumption of a single message, identified by its MessageID
@@ -454,6 +458,14 @@ func (c *regexConsumer) topics() ([]string, error) {
        }
 
        filtered := filterTopics(topics, c.pattern)
+
+       if c.options.RetryEnable && c.options.DLQ != nil {
+               retryTopic := c.options.DLQ.RetryLetterTopic
+               if retryTopic != "" && !slices.Contains(filtered, retryTopic) {
+                       filtered = append(filtered, retryTopic)
+               }
+       }
+
        return filtered, nil
 }
 
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index e1e2ca29..35ba1c28 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsaradmin"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
+       "github.com/google/uuid"
 
        "github.com/stretchr/testify/assert"
 
@@ -513,3 +514,70 @@ func TestRegexTopicGetLastMessageIDs(t *testing.T) {
                assert.Equal(t, 1, len(messages))
        }
 }
+
+func TestRegexConsumerReconsumeLater(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic1 := fmt.Sprintf("regex-reconsume-topic-%v", uuid.NewString())
+       assert.Nil(t, createPartitionedTopic(topic1, 1))
+
+       topic2 := fmt.Sprintf("regex-reconsume-topic-%v", uuid.NewString())
+       assert.Nil(t, createPartitionedTopic(topic2, 1))
+
+       topics := []string{topic1, topic2}
+
+       // create consumer
+       topicsPattern := "persistent://public/default/regex-reconsume-topic-.*"
+       consumer, err := client.Subscribe(ConsumerOptions{
+               TopicsPattern:    topicsPattern,
+               SubscriptionName: "my-sub",
+               Type:             Shared,
+               RetryEnable:      true,
+               DLQ: &DLQPolicy{
+                       MaxDeliveries: 2,
+               },
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // produce messages
+       for i, topic := range topics {
+               p, err := client.CreateProducer(ProducerOptions{
+                       Topic:           topic,
+                       DisableBatching: true,
+               })
+               if !assert.Nil(t, err) {
+                       t.Fatal()
+               }
+
+               err = genMessages(p, 1, func(_ int) string {
+                       return fmt.Sprintf("topic-%d-hello", i+1)
+               })
+               assert.Nil(t, err)
+
+               p.Close()
+       }
+
+       // we should receive one message on each topic, then one retry for each 
of those
+       for range 2 * len(topics) {
+               func() {
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 2*time.Second)
+                       defer cancel()
+
+                       msg, err := consumer.Receive(ctx)
+                       if !assert.Nil(t, err) {
+                               t.Fatal()
+                       }
+
+                       if strings.HasSuffix(msg.Topic(), RetryTopicSuffix) {
+                               assert.Nil(t, consumer.Ack(msg))
+                       } else {
+                               consumer.ReconsumeLater(msg, time.Second)
+                       }
+               }()
+       }
+}

Reply via email to