wolfstudy commented on a change in pull request #660:
URL: https://github.com/apache/pulsar-client-go/pull/660#discussion_r743363658
##########
File path: pulsar/negative_acks_tracker.go
##########
@@ -35,22 +35,41 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
- tick *time.Ticker
+ nackBackoff NackBackoffPolicy
+ trackFlag bool
delay time.Duration
log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
- t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- tick: time.NewTicker(delay / 3),
- delay: delay,
- log: logger,
- }
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+
+ t := new(negativeAcksTracker)
+
+ // When using NackBackoffPolicy, the delay time needs to be calculated
based on the RedeliveryCount field in
+ // the CommandMessage, so for the original default Nack() logic, we
still keep the negativeAcksTracker created
+ // when we open a gorutine to execute the logic of `t.track()`. But for
the NackBackoffPolicy method, we need
+ // to execute the logic of `t.track()` when AddMessage().
+ if nackBackoffPolicy != nil {
Review comment:
Yes, agree with your point of view. The problem here is because, for
nackbackoff, we can't directly get the corresponding nackDelayTime, we need to
get the redeliveryCount through the CommandMessage and then calculate the
nackDelayTime, then we can determine the time.NewTicker based on the
nackDelayTime. It is precisely because of such a relationship that the if
statement is added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]