crossoverJie commented on code in PR #1265:
URL: https://github.com/apache/pulsar-client-go/pull/1265#discussion_r1825671793


##########
pulsar/consumer_partition.go:
##########
@@ -1440,17 +1452,18 @@ func (pc *partitionConsumer) dispatcher() {
                        }
                        nextMessageSize = queueMsg.size()
 
-                       if pc.dlq.shouldSendToDlq(&nextMessage) {
-                               // pass the message to the DLQ router
-                               pc.metrics.DlqCounter.Inc()
-                               messageCh = pc.dlq.Chan()
-                       } else {
-                               // pass the message to application channel
-                               messageCh = pc.messageCh
+                       if !pc.isSeeking.Load() {
+                               if pc.dlq.shouldSendToDlq(&nextMessage) {
+                                       // pass the message to the DLQ router
+                                       pc.metrics.DlqCounter.Inc()
+                                       messageCh = pc.dlq.Chan()
+                               } else {
+                                       // pass the message to application 
channel
+                                       messageCh = pc.messageCh
+                               }
+                               pc.metrics.PrefetchedMessages.Dec()
+                               
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))

Review Comment:
   If messages can no longer be distributed, it is recommended to print a 
warning log to facilitate user troubleshooting.



##########
pulsar/consumer_partition.go:
##########
@@ -185,6 +186,16 @@ type partitionConsumer struct {
 
        redirectedClusterURI string
        backoffPolicyFunc    func() backoff.Policy
+
+       dispatcherSeekingControlCh chan bool

Review Comment:
   Can `dispatcherSeekingControlCh chan struct{}`  be used instead?



##########
pulsar/consumer_partition.go:
##########
@@ -1440,17 +1452,18 @@ func (pc *partitionConsumer) dispatcher() {
                        }
                        nextMessageSize = queueMsg.size()
 
-                       if pc.dlq.shouldSendToDlq(&nextMessage) {
-                               // pass the message to the DLQ router
-                               pc.metrics.DlqCounter.Inc()
-                               messageCh = pc.dlq.Chan()
-                       } else {
-                               // pass the message to application channel
-                               messageCh = pc.messageCh
+                       if !pc.isSeeking.Load() {

Review Comment:
   Why not use channel for communication here? but use memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to