crossoverJie commented on code in PR #1456:
URL: https://github.com/apache/pulsar-client-go/pull/1456#discussion_r2693268450
##########
pulsar/consumer_regex.go:
##########
@@ -186,12 +186,39 @@ func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}
-func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
- c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+ c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _
map[string]string, _ time.Duration) {
- c.log.Warnf("regexp consumer not support
ReconsumeLaterWithCustomProperties yet.")
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message,
customProperties map[string]string,
+ delay time.Duration) {
+ names, err := validateTopicNames(msg.Topic())
+ if err != nil {
+ c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(),
err)
+ return
+ }
+ if len(names) != 1 {
+ c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(),
names)
+ return
+ }
+
+ tn := names[0]
+ fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
+
+ c.consumersLock.Lock()
+ defer c.consumersLock.Unlock()
+
+ consumer, ok := c.consumers[fqdnTopic]
+ if !ok {
+ // check to see if the topic with the partition part is in the
consumers
+ // this can happen when the consumer is configured to consume
from a specific partition
+ if consumer, ok = c.consumers[tn.Name]; !ok {
+ c.log.Warnf("consumer of topic %s not exist
unexpectedly", msg.Topic())
+ return
+ }
+ }
+
Review Comment:
https://github.com/apache/pulsar-client-go/blob/ffdc3afec23679e86b41ebb96b25c723f2ec5e05/pulsar/consumer_multitopic.go#L260-L283
This part of the code is basically the same as `multiTopicConsumer`. Can we
reuse the related code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]