jordanfitz commented on code in PR #1456:
URL: https://github.com/apache/pulsar-client-go/pull/1456#discussion_r2695345207


##########
pulsar/consumer_regex.go:
##########
@@ -186,12 +186,39 @@ func (c *regexConsumer) Ack(msg Message) error {
        return c.AckID(msg.ID())
 }
 
-func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
-       c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+       c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
 }
 
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ 
map[string]string, _ time.Duration) {
-       c.log.Warnf("regexp consumer not support 
ReconsumeLaterWithCustomProperties yet.")
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, 
customProperties map[string]string,
+       delay time.Duration) {
+       names, err := validateTopicNames(msg.Topic())
+       if err != nil {
+               c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), 
err)
+               return
+       }
+       if len(names) != 1 {
+               c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), 
names)
+               return
+       }
+
+       tn := names[0]
+       fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
+
+       c.consumersLock.Lock()
+       defer c.consumersLock.Unlock()
+
+       consumer, ok := c.consumers[fqdnTopic]
+       if !ok {
+               // check to see if the topic with the partition part is in the 
consumers
+               // this can happen when the consumer is configured to consume 
from a specific partition
+               if consumer, ok = c.consumers[tn.Name]; !ok {
+                       c.log.Warnf("consumer of topic %s not exist 
unexpectedly", msg.Topic())
+                       return
+               }
+       }
+

Review Comment:
   I went ahead and added a shared func called 
`reconsumeLaterWithMultipleTopics`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to