Copilot commented on code in PR #1456:
URL: https://github.com/apache/pulsar-client-go/pull/1456#discussion_r2689351938
##########
pulsar/consumer_regex.go:
##########
@@ -454,6 +476,11 @@ func (c *regexConsumer) topics() ([]string, error) {
}
filtered := filterTopics(topics, c.pattern)
+
+ if c.options.RetryEnable {
Review Comment:
Consider adding a nil check for c.options.DLQ before accessing
c.options.DLQ.RetryLetterTopic for defensive programming. While the current
code path through newConsumer ensures DLQ is initialized when RetryEnable is
true, adding this check would make the code more robust and easier to maintain.
```suggestion
if c.options.RetryEnable && c.options.DLQ != nil {
```
##########
pulsar/consumer_regex.go:
##########
@@ -186,12 +186,34 @@ func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}
-func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
- c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+ c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}
-func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _
map[string]string, _ time.Duration) {
- c.log.Warnf("regexp consumer not support
ReconsumeLaterWithCustomProperties yet.")
+func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message,
customProperties map[string]string,
+ delay time.Duration) {
+ names, err := validateTopicNames(msg.Topic())
+ if err != nil {
+ c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(),
err)
+ return
+ }
+ if len(names) != 1 {
+ c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(),
names)
+ return
+ }
+
+ tn := names[0]
+ fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
+ consumer, ok := c.consumers[fqdnTopic]
+ if !ok {
+ // check to see if the topic with the partition part is in the
consumers
+ // this can happen when the consumer is configured to consume
from a specific partition
+ if consumer, ok = c.consumers[tn.Name]; !ok {
+ c.log.Warnf("consumer of topic %s not exist
unexpectedly", msg.Topic())
+ return
+ }
+ }
+ consumer.ReconsumeLaterWithCustomProperties(msg, customProperties,
delay)
Review Comment:
There's a potential race condition here. The consumers map is being accessed
without holding consumersLock, while the discover() method (which runs in a
separate goroutine) modifies this map with the lock held. This could lead to
concurrent map read/write panics. The map access should be protected with
consumersLock to ensure thread safety.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]