This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new e78dc3c Introduce doneCh for ack error (#777) e78dc3c is described below commit e78dc3c28372fde1e1ce40f727ff183902be59c7 Author: xiaolong ran <xiaolong...@tencent.com> AuthorDate: Tue Sep 27 09:16:29 2022 +0800 Introduce doneCh for ack error (#777) * Introduce doneCh for ack error Signed-off-by: xiaolongran <xiaolong...@tencent.com> * remove consumer partition test file Signed-off-by: xiaolongran <xiaolong...@tencent.com> * Refactor ack response func Signed-off-by: xiaolongran <xiaolong...@tencent.com> * fix ack error Signed-off-by: xiaolongran <xiaolong...@tencent.com> Signed-off-by: xiaolongran <xiaolong...@tencent.com> --- pulsar/consumer_impl.go | 6 ++++++ pulsar/consumer_multitopic.go | 4 ++++ pulsar/consumer_partition.go | 31 +++++++++++++++++++++++++++++-- pulsar/consumer_regex.go | 4 ++++ pulsar/impl_message.go | 11 +++++++++++ 5 files changed, 54 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2328ca8..e6135bf 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -36,7 +36,9 @@ import ( const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { + // AckID does not handle errors returned by the Broker side, so no need to wait for doneCh to finish. AckID(id trackingMessageID) error + AckIDWithResponse(id trackingMessageID) error NackID(id trackingMessageID) NackMsg(msg Message) } @@ -462,6 +464,10 @@ func (c *consumer) AckID(msgID MessageID) error { return mid.Ack() } + if c.options.AckWithResponse { + return c.consumers[mid.partitionIdx].AckIDWithResponse(mid) + } + return c.consumers[mid.partitionIdx].AckID(mid) } diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 1d75a24..380dd75 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -136,6 +136,10 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error { return errors.New("unable to ack message because consumer is nil") } + if c.options.AckWithResponse { + return mid.AckWithResponse() + } + return mid.Ack() } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 50fa3c4..cc9e710 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -366,6 +366,29 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error return convertToMessageID(id), nil } +func (pc *partitionConsumer) AckIDWithResponse(msgID trackingMessageID) error { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") + return errors.New("consumer state is closed") + } + + ackReq := new(ackRequest) + ackReq.doneCh = make(chan struct{}) + if !msgID.Undefined() && msgID.ack() { + pc.metrics.AcksCounter.Inc() + pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) + ackReq.msgID = msgID + // send ack request to eventsCh + pc.eventsCh <- ackReq + // wait for the request to complete + <-ackReq.doneCh + + pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) + } + + return ackReq.err +} + func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") @@ -373,12 +396,14 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { } ackReq := new(ackRequest) + ackReq.doneCh = make(chan struct{}) if !msgID.Undefined() && msgID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) ackReq.msgID = msgID // send ack request to eventsCh pc.eventsCh <- ackReq + // No need to wait for ackReq.doneCh to finish pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) } @@ -562,6 +587,7 @@ func (pc *partitionConsumer) clearMessageChannels() { } func (pc *partitionConsumer) internalAck(req *ackRequest) { + defer close(req.doneCh) if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return @@ -986,8 +1012,9 @@ func (pc *partitionConsumer) dispatcher() { } type ackRequest struct { - msgID trackingMessageID - err error + doneCh chan struct{} + msgID trackingMessageID + err error } type unsubscribeRequest struct { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e4d2077..c55a1c1 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -180,6 +180,10 @@ func (c *regexConsumer) AckID(msgID MessageID) error { return errors.New("consumer is nil in consumer_regex") } + if c.options.AckWithResponse { + return mid.AckWithResponse() + } + return mid.Ack() } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 067439f..d155ae7 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -75,6 +75,17 @@ func (id trackingMessageID) Ack() error { return nil } +func (id trackingMessageID) AckWithResponse() error { + if id.consumer == nil { + return errors.New("consumer is nil in trackingMessageID") + } + if id.ack() { + return id.consumer.AckIDWithResponse(id) + } + + return nil +} + func (id trackingMessageID) Nack() { if id.consumer == nil { return