This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 875f6ba8 fix: seek race (#1265)
875f6ba8 is described below
commit 875f6ba883ebfcc9eb02fb339bcb3446bd0cbde4
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Nov 4 14:51:30 2024 +0800
fix: seek race (#1265)
* fix: use same goroutine to perform reconnect and seek
* fix: pause dispatch message before performing seek
* use chan struct{} instead chan bool
* add log when seek
---
pulsar/consumer_impl.go | 22 +++++----
pulsar/consumer_partition.go | 109 +++++++++++++++++++++++++------------------
2 files changed, 76 insertions(+), 55 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 92376ac1..740a7df9 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -712,22 +712,29 @@ func (c *consumer) Seek(msgID MessageID) error {
return err
}
- if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
- return err
- }
-
+ consumer := c.consumers[msgID.PartitionIdx()]
+ consumer.pauseDispatchMessage()
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
- return nil
+ return consumer.Seek(msgID)
}
func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
var errs error
+
+ for _, cons := range c.consumers {
+ cons.pauseDispatchMessage()
+ }
+ // clear messageCh
+ for len(c.messageCh) > 0 {
+ <-c.messageCh
+ }
+
// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
@@ -736,11 +743,6 @@ func (c *consumer) SeekByTime(time time.Time) error {
}
}
- // clear messageCh
- for len(c.messageCh) > 0 {
- <-c.messageCh
- }
-
return errs
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 0e8274e6..4e8fba5a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -24,6 +24,7 @@ import (
"math"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar/backoff"
@@ -185,6 +186,16 @@ type partitionConsumer struct {
redirectedClusterURI string
backoffPolicyFunc func() backoff.Policy
+
+ dispatcherSeekingControlCh chan struct{}
+ isSeeking atomic.Bool
+}
+
+// pauseDispatchMessage used to discard the message in the dispatcher
goroutine.
+// This method will be called When the parent consumer performs the seek
operation.
+// After the seek operation, the dispatcher will continue dispatching messages
automatically.
+func (pc *partitionConsumer) pauseDispatchMessage() {
+ pc.dispatcherSeekingControlCh <- struct{}{}
}
func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -329,27 +340,28 @@ func newPartitionConsumer(parent Consumer, client
*client, options *partitionCon
}
pc := &partitionConsumer{
- parentConsumer: parent,
- client: client,
- options: options,
- topic: options.topic,
- name: options.consumerName,
- consumerID: client.rpcClient.NewConsumerID(),
- partitionIdx: int32(options.partitionIdx),
- eventsCh: make(chan interface{}, 10),
- maxQueueSize: int32(options.receiverQueueSize),
- queueCh: make(chan *message,
options.receiverQueueSize),
- startMessageID: atomicMessageID{msgID:
options.startMessageID},
- connectedCh: make(chan struct{}),
- messageCh: messageCh,
- connectClosedCh: make(chan *connectionClosed, 1),
- closeCh: make(chan struct{}),
- clearQueueCh: make(chan func(id *trackingMessageID)),
- compressionProviders: sync.Map{},
- dlq: dlq,
- metrics: metrics,
- schemaInfoCache: newSchemaInfoCache(client, options.topic),
- backoffPolicyFunc: boFunc,
+ parentConsumer: parent,
+ client: client,
+ options: options,
+ topic: options.topic,
+ name: options.consumerName,
+ consumerID: client.rpcClient.NewConsumerID(),
+ partitionIdx: int32(options.partitionIdx),
+ eventsCh: make(chan interface{}, 10),
+ maxQueueSize: int32(options.receiverQueueSize),
+ queueCh: make(chan *message,
options.receiverQueueSize),
+ startMessageID: atomicMessageID{msgID:
options.startMessageID},
+ connectedCh: make(chan struct{}),
+ messageCh: messageCh,
+ connectClosedCh: make(chan *connectionClosed, 1),
+ closeCh: make(chan struct{}),
+ clearQueueCh: make(chan func(id
*trackingMessageID)),
+ compressionProviders: sync.Map{},
+ dlq: dlq,
+ metrics: metrics,
+ schemaInfoCache: newSchemaInfoCache(client,
options.topic),
+ backoffPolicyFunc: boFunc,
+ dispatcherSeekingControlCh: make(chan struct{}),
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -1440,17 +1452,20 @@ func (pc *partitionConsumer) dispatcher() {
}
nextMessageSize = queueMsg.size()
- if pc.dlq.shouldSendToDlq(&nextMessage) {
- // pass the message to the DLQ router
- pc.metrics.DlqCounter.Inc()
- messageCh = pc.dlq.Chan()
+ if !pc.isSeeking.Load() {
+ if pc.dlq.shouldSendToDlq(&nextMessage) {
+ // pass the message to the DLQ router
+ pc.metrics.DlqCounter.Inc()
+ messageCh = pc.dlq.Chan()
+ } else {
+ // pass the message to application
channel
+ messageCh = pc.messageCh
+ }
+ pc.metrics.PrefetchedMessages.Dec()
+
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
} else {
- // pass the message to application channel
- messageCh = pc.messageCh
+ pc.log.Debug("skip dispatching messages when
seeking")
}
-
- pc.metrics.PrefetchedMessages.Dec()
-
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
} else {
queueCh = pc.queueCh
}
@@ -1483,6 +1498,13 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.WithError(err).Error("unable to send
initial permits to broker")
}
+ case _, ok := <-pc.dispatcherSeekingControlCh:
+ if !ok {
+ return
+ }
+ pc.log.Debug("received dispatcherSeekingControlCh, set
isSeek to true")
+ pc.isSeeking.Store(true)
+
case msg, ok := <-queueCh:
if !ok {
return
@@ -1587,22 +1609,16 @@ func (pc *partitionConsumer) runEventsLoop() {
}()
pc.log.Debug("get into runEventsLoop")
- go func() {
- for {
- select {
- case <-pc.closeCh:
- pc.log.Info("close consumer, exit reconnect")
- return
- case connectionClosed := <-pc.connectClosedCh:
- pc.log.Debug("runEventsLoop will reconnect")
- pc.reconnectToBroker(connectionClosed)
- }
- }
- }()
-
for {
- for i := range pc.eventsCh {
- switch v := i.(type) {
+ select {
+ case <-pc.closeCh:
+ pc.log.Info("close consumer, exit reconnect")
+ return
+ case connectionClosed := <-pc.connectClosedCh:
+ pc.log.Debug("runEventsLoop will reconnect")
+ pc.reconnectToBroker(connectionClosed)
+ case event := <-pc.eventsCh:
+ switch v := event.(type) {
case *ackRequest:
pc.internalAck(v)
case *ackWithTxnRequest:
@@ -1680,6 +1696,9 @@ func (pc *partitionConsumer) internalClose(req
*closeRequest) {
}
func (pc *partitionConsumer) reconnectToBroker(connectionClosed
*connectionClosed) {
+ if pc.isSeeking.CompareAndSwap(true, false) {
+ pc.log.Debug("seek operation triggers reconnection, and reset
isSeeking")
+ }
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration