This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 71bec03 Added negative acks tracker and integration (#94) 71bec03 is described below commit 71bec030ccd7bc19014d2211ec04487bdb66dac8 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Nov 12 11:07:06 2019 -0800 Added negative acks tracker and integration (#94) * Added negative acks tracker and integration * Fixed race condition in tests * Fixed return after warning * fixed format --- examples/consumer-listener/consumer-listener.go | 4 +- examples/consumer/consumer.go | 4 +- perf/perf-consumer.go | 4 +- pulsar/consumer.go | 27 +++++- pulsar/consumer_impl.go | 46 +++++++-- pulsar/consumer_partition.go | 74 ++++++++++----- pulsar/consumer_test.go | 27 ++---- pulsar/impl_producer.go | 6 +- pulsar/negative_acks_tracker.go | 110 +++++++++++++++++++++ pulsar/negative_acks_tracker_test.go | 121 ++++++++++++++++++++++++ pulsar/producer_test.go | 3 +- 11 files changed, 358 insertions(+), 68 deletions(-) diff --git a/examples/consumer-listener/consumer-listener.go b/examples/consumer-listener/consumer-listener.go index 0a8e6c9..618c3c0 100644 --- a/examples/consumer-listener/consumer-listener.go +++ b/examples/consumer-listener/consumer-listener.go @@ -57,8 +57,6 @@ func main() { fmt.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload())) - if err := consumer.Ack(msg); err != nil { - log.Fatal(err) - } + consumer.Ack(msg) } } diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go index 5250a02..0b819bc 100644 --- a/examples/consumer/consumer.go +++ b/examples/consumer/consumer.go @@ -52,9 +52,7 @@ func main() { fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) - if err := consumer.Ack(msg); err != nil { - log.Fatal(err) - } + consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go index 8fe2a86..582c06d 100644 --- a/perf/perf-consumer.go +++ b/perf/perf-consumer.go @@ -101,9 +101,7 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) { } msgReceived++ bytesReceived += int64(len(cm.Message.Payload())) - if err := consumer.Ack(cm.Message); err != nil { - return - } + consumer.Ack(cm.Message) case <-tick.C: currentMsgReceived := atomic.SwapInt64(&msgReceived, 0) currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index b0bba1b..961611e 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "time" ) // Pair of a Consumer and Message @@ -106,6 +107,10 @@ type ConsumerOptions struct { // ReceiverQueueSize(int) if the total exceeds this value (default: 50000). MaxTotalReceiverQueueSizeAcrossPartitions int + // The delay after which to redeliver the messages that failed to be + // processed. Default is 1min. (See `Consumer.Nack()`) + NackRedeliveryDelay *time.Duration + // Set the consumer name. Name string @@ -136,10 +141,28 @@ type Consumer interface { Chan() <-chan ConsumerMessage // Ack the consumption of a single message - Ack(Message) error + Ack(Message) // AckID the consumption of a single message, identified by its MessageID - AckID(MessageID) error + AckID(MessageID) + + // Acknowledge the failure to process a single message. + // + // When a message is "negatively acked" it will be marked for redelivery after + // some fixed delay. The delay is configurable when constructing the consumer + // with ConsumerOptions.NAckRedeliveryDelay . + // + // This call is not blocking. + Nack(Message) + + // Acknowledge the failure to process a single message. + // + // When a message is "negatively acked" it will be marked for redelivery after + // some fixed delay. The delay is configurable when constructing the consumer + // with ConsumerOptions.NackRedeliveryDelay . + // + // This call is not blocking. + NackID(MessageID) // Close the consumer and stop the broker to push more messages Close() error diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 8b73631..0a628b7 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -33,6 +33,8 @@ import ( var ErrConsumerClosed = errors.New("consumer closed") +const defaultNackRedeliveryDelay = 1 * time.Minute + type consumer struct { options ConsumerOptions @@ -117,6 +119,13 @@ func topicSubscribe(client *client, options ConsumerOptions, topic string, wg.Add(1) go func(idx int, pt string) { defer wg.Done() + + var nackRedeliveryDelay time.Duration + if options.NackRedeliveryDelay == nil { + nackRedeliveryDelay = defaultNackRedeliveryDelay + } else { + nackRedeliveryDelay = *options.NackRedeliveryDelay + } opts := &partitionConsumerOpts{ topic: pt, consumerName: consumerName, @@ -125,6 +134,7 @@ func topicSubscribe(client *client, options ConsumerOptions, topic string, subscriptionInitPos: options.SubscriptionInitialPosition, partitionIdx: idx, receiverQueueSize: receiverQueueSize, + nackRedeliveryDelay: nackRedeliveryDelay, } cons, err := newPartitionConsumer(consumer, client, opts, messageCh) ch <- ConsumerError{ @@ -199,24 +209,48 @@ func (c *consumer) Chan() <-chan ConsumerMessage { } // Ack the consumption of a single message -func (c *consumer) Ack(msg Message) error { - return c.AckID(msg.ID()) +func (c *consumer) Ack(msg Message) { + c.AckID(msg.ID()) } // Ack the consumption of a single message, identified by its MessageID -func (c *consumer) AckID(msgID MessageID) error { +func (c *consumer) AckID(msgID MessageID) { mid, ok := msgID.(*messageID) if !ok { - return fmt.Errorf("invalid message id type") + c.log.Warnf("invalid message id type") + return } partition := mid.partitionIdx // did we receive a valid partition index? if partition < 0 || partition >= len(c.consumers) { - return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]", + c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", partition, len(c.consumers)) + return } - return c.consumers[partition].AckID(msgID) + c.consumers[partition].AckID(mid) +} + +func (c *consumer) Nack(msg Message) { + c.AckID(msg.ID()) +} + +func (c *consumer) NackID(msgID MessageID) { + mid, ok := msgID.(*messageID) + if !ok { + c.log.Warnf("invalid message id type") + return + } + + partition := mid.partitionIdx + // did we receive a valid partition index? + if partition < 0 || partition >= len(c.consumers) { + c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", + partition, len(c.consumers)) + return + } + + c.consumers[partition].NackID(mid) } func (c *consumer) Close() error { diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index aaf6ade..a8d9b6c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -47,6 +47,7 @@ type partitionConsumerOpts struct { subscriptionInitPos SubscriptionInitialPosition partitionIdx int receiverQueueSize int + nackRedeliveryDelay time.Duration } type partitionConsumer struct { @@ -78,6 +79,8 @@ type partitionConsumer struct { connectedCh chan struct{} closeCh chan struct{} + nackTracker *negativeAcksTracker + log *log.Entry } @@ -101,6 +104,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon log: log.WithField("topic", options.topic), } pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription) + pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay) err := pc.grabConn() if err != nil { @@ -143,19 +147,39 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { pc.conn.DeleteConsumeHandler(pc.consumerID) } -func (pc *partitionConsumer) Ack(msg Message) error { - return pc.AckID(msg.ID()) -} - -func (pc *partitionConsumer) AckID(msgID MessageID) error { +func (pc *partitionConsumer) AckID(msgID *messageID) { req := &ackRequest{ - doneCh: make(chan struct{}), - msgID: msgID, + msgID: msgID, } pc.eventsCh <- req +} - <-req.doneCh - return req.err +func (pc *partitionConsumer) NackID(msgID *messageID) { + pc.nackTracker.Add(msgID) +} + +func (pc *partitionConsumer) Redeliver(msgIds []messageID) { + pc.eventsCh <- &redeliveryRequest{msgIds} +} + +func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { + msgIds := req.msgIds + pc.log.Debug("Request redelivery after negative ack for messages", msgIds) + + msgIdDataList := make([]*pb.MessageIdData, len(msgIds)) + for i := 0; i < len(msgIds); i++ { + msgIdDataList[i] = &pb.MessageIdData{ + LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)), + EntryId: proto.Uint64(uint64(msgIds[i].entryID)), + } + } + + requestID := internal.RequestIDNoResponse + pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, + pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{ + ConsumerId: proto.Uint64(pc.consumerID), + MessageIds: msgIdDataList, + }) } func (pc *partitionConsumer) Close() error { @@ -172,28 +196,21 @@ func (pc *partitionConsumer) Close() error { } func (pc *partitionConsumer) internalAck(req *ackRequest) { - defer close(req.doneCh) + msgId := req.msgID - id := &pb.MessageIdData{} - messageIDs := make([]*pb.MessageIdData, 0) - err := proto.Unmarshal(req.msgID.Serialize(), id) - if err != nil { - pc.log.WithError(err).Error("unable to serialize message id") - req.err = err + messageIDs := make([]*pb.MessageIdData, 1) + messageIDs[0] = &pb.MessageIdData{ + LedgerId: proto.Uint64(uint64(msgId.ledgerID)), + EntryId: proto.Uint64(uint64(msgId.entryID)), } - - messageIDs = append(messageIDs, id) requestID := internal.RequestIDNoResponse cmdAck := &pb.CommandAck{ ConsumerId: proto.Uint64(pc.consumerID), MessageId: messageIDs, AckType: pb.CommandAck_Individual.Enum(), } - _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck) - if err != nil { - pc.log.WithError(err).Errorf("failed to ack message_id=%s", id) - req.err = err - } + + pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck) } func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error { @@ -369,9 +386,7 @@ func (pc *partitionConsumer) dispatcher() { } type ackRequest struct { - doneCh chan struct{} - msgID MessageID - err error + msgID *messageID } type unsubscribeRequest struct { @@ -384,6 +399,10 @@ type closeRequest struct { err error } +type redeliveryRequest struct { + msgIds []messageID +} + func (pc *partitionConsumer) runEventsLoop() { defer func() { pc.log.Info("exiting events loop") @@ -396,6 +415,8 @@ func (pc *partitionConsumer) runEventsLoop() { switch v := i.(type) { case *ackRequest: pc.internalAck(v) + case *redeliveryRequest: + pc.internalRedeliver(v) case *unsubscribeRequest: pc.internalUnsubscribe(v) case *connectionClosed: @@ -429,6 +450,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { pc.log.Info("Closed consumer") pc.state = consumerClosed pc.conn.DeleteConsumeHandler(pc.consumerID) + pc.nackTracker.Close() close(pc.closeCh) } } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 33c2f15..c45b932 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -90,9 +90,7 @@ func TestProducerConsumer(t *testing.T) { assert.Equal(t, expectProperties, msg.Properties()) // ack message - if err := consumer.Ack(msg); err != nil { - log.Fatal(err) - } + consumer.Ack(msg) } } @@ -163,8 +161,7 @@ func TestBatchMessageReceive(t *testing.T) { for i := 0; i < numOfMessages; i++ { msg, err := consumer.Receive(ctx) assert.Nil(t, err) - err = consumer.Ack(msg) - assert.Nil(t, err) + consumer.Ack(msg) count++ } @@ -301,17 +298,13 @@ func TestConsumerKeyShared(t *testing.T) { break } receivedConsumer1++ - if err := consumer1.Ack(cm.Message); err != nil { - log.Fatal(err) - } + consumer1.Ack(cm.Message) case cm, ok := <-consumer2.Chan(): if !ok { break } receivedConsumer2++ - if err := consumer2.Ack(cm.Message); err != nil { - log.Fatal(err) - } + consumer2.Ack(cm.Message) } } @@ -372,9 +365,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) - if err := consumer.Ack(msg); err != nil { - assert.Nil(t, err) - } + consumer.Ack(msg) } assert.Equal(t, len(msgs), 10) @@ -465,9 +456,7 @@ func TestConsumerShared(t *testing.T) { payload := string(cm.Message.Payload()) messages[payload] = struct{}{} fmt.Printf("consumer1 msg id is: %v, value is: %s\n", cm.Message.ID(), payload) - if err := consumer1.Ack(cm.Message); err != nil { - log.Fatal(err) - } + consumer1.Ack(cm.Message) case cm, ok := <-consumer2.Chan(): if !ok { break @@ -476,9 +465,7 @@ func TestConsumerShared(t *testing.T) { payload := string(cm.Message.Payload()) messages[payload] = struct{}{} fmt.Printf("consumer2 msg id is: %v, value is: %s\n", cm.Message.ID(), payload) - if err := consumer2.Ack(cm.Message); err != nil { - log.Fatal(err) - } + consumer2.Ack(cm.Message) } } diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go index d806087..b0620c2 100644 --- a/pulsar/impl_producer.go +++ b/pulsar/impl_producer.go @@ -146,9 +146,9 @@ func (p *producer) LastSequenceID() int64 { func (p *producer) Flush() error { for _, pp := range p.producers { - if err := pp.Flush(); err != nil { - return err - } + if err := pp.Flush(); err != nil { + return err + } } return nil diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go new file mode 100644 index 0000000..e8a79e8 --- /dev/null +++ b/pulsar/negative_acks_tracker.go @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + log "github.com/sirupsen/logrus" + "sync" + "time" +) + +type redeliveryConsumer interface { + Redeliver(msgIds []messageID) +} + +type negativeAcksTracker struct { + sync.Mutex + + doneCh chan interface{} + negativeAcks map[messageID]time.Time + rc redeliveryConsumer + tick *time.Ticker + delay time.Duration +} + +func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) *negativeAcksTracker { + t := &negativeAcksTracker{ + doneCh: make(chan interface{}), + negativeAcks: make(map[messageID]time.Time), + rc: rc, + tick: time.NewTicker(delay / 3), + delay: delay, + } + + go t.track() + return t +} + +func (t *negativeAcksTracker) Add(msgID *messageID) { + // Always clear up the batch index since we want to track the nack + // for the entire batch + batchMsgId := messageID{ + ledgerID: msgID.ledgerID, + entryID: msgID.entryID, + batchIdx: 0, + } + + t.Lock() + defer t.Unlock() + + _, present := t.negativeAcks[batchMsgId] + if present { + // The batch is already being tracked + return + } else { + targetTime := time.Now().Add(t.delay) + t.negativeAcks[batchMsgId] = targetTime + } +} + +func (t *negativeAcksTracker) track() { + for { + select { + case <-t.doneCh: + log.Debug("Closing nack tracker") + return + + case <-t.tick.C: + { + t.Lock() + + now := time.Now() + msgIds := make([]messageID, 0) + for msgID, targetTime := range t.negativeAcks { + log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now) + if targetTime.Before(now) { + log.Debugf("Adding MsgId: %v", msgID) + msgIds = append(msgIds, msgID) + delete(t.negativeAcks, msgID) + } + } + + t.Unlock() + + if len(msgIds) > 0 { + t.rc.Redeliver(msgIds) + } + } + + } + } +} + +func (t *negativeAcksTracker) Close() { + t.doneCh <- nil +} diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go new file mode 100644 index 0000000..733114a --- /dev/null +++ b/pulsar/negative_acks_tracker_test.go @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "github.com/stretchr/testify/assert" + "sort" + "sync" + "testing" + "time" +) + +type nackMockedConsumer struct { + sync.Mutex + cond *sync.Cond + msgIds []messageID +} + +func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) { + nmc.Lock() + if nmc.msgIds == nil { + nmc.msgIds = msgIds + sort.Slice(msgIds, func(i, j int) bool { + return msgIds[i].ledgerID < msgIds[j].entryID + }) + nmc.cond.Signal() + } + + nmc.Unlock() +} + +func (nmc *nackMockedConsumer) Wait() []messageID { + nmc.Lock() + defer nmc.Unlock() + nmc.cond.Wait() + + return nmc.msgIds +} + +func TestNacksTracker(t *testing.T) { + nmc := &nackMockedConsumer{} + nmc.cond = sync.NewCond(nmc) + nacks := newNegativeAcksTracker(nmc, 1*time.Second) + + nacks.Add(&messageID{ + ledgerID: 1, + entryID: 1, + batchIdx: 1, + }) + + nacks.Add(&messageID{ + ledgerID: 2, + entryID: 2, + batchIdx: 1, + }) + + msgIds := nmc.Wait() + + assert.Equal(t, 2, len(msgIds)) + assert.Equal(t, int64(1), msgIds[0].ledgerID) + assert.Equal(t, int64(1), msgIds[0].entryID) + assert.Equal(t, int64(2), msgIds[1].ledgerID) + assert.Equal(t, int64(2), msgIds[1].entryID) + + nacks.Close() +} + +func TestNacksWithBatchesTracker(t *testing.T) { + nmc := &nackMockedConsumer{} + nmc.cond = sync.NewCond(nmc) + nacks := newNegativeAcksTracker(nmc, 1*time.Second) + + nacks.Add(&messageID{ + ledgerID: 1, + entryID: 1, + batchIdx: 1, + }) + + nacks.Add(&messageID{ + ledgerID: 1, + entryID: 1, + batchIdx: 2, + }) + + nacks.Add(&messageID{ + ledgerID: 1, + entryID: 1, + batchIdx: 3, + }) + + nacks.Add(&messageID{ + ledgerID: 2, + entryID: 2, + batchIdx: 1, + }) + + msgIds := nmc.Wait() + + assert.Equal(t, 2, len(msgIds)) + assert.Equal(t, int64(1), msgIds[0].ledgerID) + assert.Equal(t, int64(1), msgIds[0].entryID) + assert.Equal(t, int64(2), msgIds[1].ledgerID) + assert.Equal(t, int64(2), msgIds[1].entryID) + + nacks.Close() +} diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 052e92a..1961172 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -434,8 +434,7 @@ func TestFlushInPartitionedProducer(t *testing.T) { fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) assert.Nil(t, err) - err = consumer.Ack(msg) - assert.Nil(t, err) + consumer.Ack(msg) msgCount++ } assert.Equal(t, msgCount, numOfMessages/2)