YanshuoH commented on issue #1388:
URL:
https://github.com/apache/pulsar-client-go/issues/1388#issuecomment-3052398771
Looking at version v0.9.0, there is no real topic override in the
`dlq_router.run`
```
func (r *dlqRouter) run() {
for {
select {
case cm := <-r.messageCh:
r.log.WithField("msgID", cm.ID()).Debug("Got message
for DLQ")
producer :=
r.getProducer(cm.Consumer.(*consumer).options.Schema)
msg := cm.Message.(*message)
msgID := msg.ID()
producer.SendAsync(context.Background(),
&ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
Properties: msg.Properties(),
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(MessageID, *ProducerMessage, error) {
r.log.WithField("msgID", msgID).Debug("Sent
message to DLQ")
// The Producer ack might be coming from the
connection go-routine that
// is also used by the consumer. In that case
we would get a dead-lock
// if we'd try to ack.
go cm.Consumer.AckID(msgID)
})
case <-r.closeCh:
if r.producer != nil {
r.producer.Close()
}
r.log.Debug("Closed DLQ router")
return
}
}
}
```
I think it comes from this PR:
https://github.com/apache/pulsar-client-go/pull/907/files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]