This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 71bec03  Added negative acks tracker and integration (#94)
71bec03 is described below

commit 71bec030ccd7bc19014d2211ec04487bdb66dac8
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Nov 12 11:07:06 2019 -0800

    Added negative acks tracker and integration (#94)
    
    * Added negative acks tracker and integration
    
    * Fixed race condition in tests
    
    * Fixed return after warning
    
    * fixed format
---
 examples/consumer-listener/consumer-listener.go |   4 +-
 examples/consumer/consumer.go                   |   4 +-
 perf/perf-consumer.go                           |   4 +-
 pulsar/consumer.go                              |  27 +++++-
 pulsar/consumer_impl.go                         |  46 +++++++--
 pulsar/consumer_partition.go                    |  74 ++++++++++-----
 pulsar/consumer_test.go                         |  27 ++----
 pulsar/impl_producer.go                         |   6 +-
 pulsar/negative_acks_tracker.go                 | 110 +++++++++++++++++++++
 pulsar/negative_acks_tracker_test.go            | 121 ++++++++++++++++++++++++
 pulsar/producer_test.go                         |   3 +-
 11 files changed, 358 insertions(+), 68 deletions(-)

diff --git a/examples/consumer-listener/consumer-listener.go 
b/examples/consumer-listener/consumer-listener.go
index 0a8e6c9..618c3c0 100644
--- a/examples/consumer-listener/consumer-listener.go
+++ b/examples/consumer-listener/consumer-listener.go
@@ -57,8 +57,6 @@ func main() {
                fmt.Printf("Received message  msgId: %v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
 
-               if err := consumer.Ack(msg); err != nil {
-                       log.Fatal(err)
-               }
+               consumer.Ack(msg)
        }
 }
diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go
index 5250a02..0b819bc 100644
--- a/examples/consumer/consumer.go
+++ b/examples/consumer/consumer.go
@@ -52,9 +52,7 @@ func main() {
                fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
 
-               if err := consumer.Ack(msg); err != nil {
-                       log.Fatal(err)
-               }
+               consumer.Ack(msg)
        }
 
        if err := consumer.Unsubscribe(); err != nil {
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 8fe2a86..582c06d 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -101,9 +101,7 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan 
struct{}) {
                        }
                        msgReceived++
                        bytesReceived += int64(len(cm.Message.Payload()))
-                       if err := consumer.Ack(cm.Message); err != nil {
-                               return
-                       }
+                       consumer.Ack(cm.Message)
                case <-tick.C:
                        currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
                        currentBytesReceived := 
atomic.SwapInt64(&bytesReceived, 0)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index b0bba1b..961611e 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "time"
 )
 
 // Pair of a Consumer and Message
@@ -106,6 +107,10 @@ type ConsumerOptions struct {
        // ReceiverQueueSize(int) if the total exceeds this value (default: 
50000).
        MaxTotalReceiverQueueSizeAcrossPartitions int
 
+       // The delay after which to redeliver the messages that failed to be
+       // processed. Default is 1min. (See `Consumer.Nack()`)
+       NackRedeliveryDelay *time.Duration
+
        // Set the consumer name.
        Name string
 
@@ -136,10 +141,28 @@ type Consumer interface {
        Chan() <-chan ConsumerMessage
 
        // Ack the consumption of a single message
-       Ack(Message) error
+       Ack(Message)
 
        // AckID the consumption of a single message, identified by its 
MessageID
-       AckID(MessageID) error
+       AckID(MessageID)
+
+       // Acknowledge the failure to process a single message.
+       //
+       // When a message is "negatively acked" it will be marked for 
redelivery after
+       // some fixed delay. The delay is configurable when constructing the 
consumer
+       // with ConsumerOptions.NAckRedeliveryDelay .
+       //
+       // This call is not blocking.
+       Nack(Message)
+
+       // Acknowledge the failure to process a single message.
+       //
+       // When a message is "negatively acked" it will be marked for 
redelivery after
+       // some fixed delay. The delay is configurable when constructing the 
consumer
+       // with ConsumerOptions.NackRedeliveryDelay .
+       //
+       // This call is not blocking.
+       NackID(MessageID)
 
        // Close the consumer and stop the broker to push more messages
        Close() error
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 8b73631..0a628b7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -33,6 +33,8 @@ import (
 
 var ErrConsumerClosed = errors.New("consumer closed")
 
+const defaultNackRedeliveryDelay = 1 * time.Minute
+
 type consumer struct {
        options ConsumerOptions
 
@@ -117,6 +119,13 @@ func topicSubscribe(client *client, options 
ConsumerOptions, topic string,
                wg.Add(1)
                go func(idx int, pt string) {
                        defer wg.Done()
+
+                       var nackRedeliveryDelay time.Duration
+                       if options.NackRedeliveryDelay == nil {
+                               nackRedeliveryDelay = defaultNackRedeliveryDelay
+                       } else {
+                               nackRedeliveryDelay = 
*options.NackRedeliveryDelay
+                       }
                        opts := &partitionConsumerOpts{
                                topic:               pt,
                                consumerName:        consumerName,
@@ -125,6 +134,7 @@ func topicSubscribe(client *client, options 
ConsumerOptions, topic string,
                                subscriptionInitPos: 
options.SubscriptionInitialPosition,
                                partitionIdx:        idx,
                                receiverQueueSize:   receiverQueueSize,
+                               nackRedeliveryDelay: nackRedeliveryDelay,
                        }
                        cons, err := newPartitionConsumer(consumer, client, 
opts, messageCh)
                        ch <- ConsumerError{
@@ -199,24 +209,48 @@ func (c *consumer) Chan() <-chan ConsumerMessage {
 }
 
 // Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) error {
-       return c.AckID(msg.ID())
+func (c *consumer) Ack(msg Message) {
+       c.AckID(msg.ID())
 }
 
 // Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) error {
+func (c *consumer) AckID(msgID MessageID) {
        mid, ok := msgID.(*messageID)
        if !ok {
-               return fmt.Errorf("invalid message id type")
+               c.log.Warnf("invalid message id type")
+               return
        }
 
        partition := mid.partitionIdx
        // did we receive a valid partition index?
        if partition < 0 || partition >= len(c.consumers) {
-               return fmt.Errorf("invalid partition index %d expected a 
partition between [0-%d]",
+               c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
                        partition, len(c.consumers))
+               return
        }
-       return c.consumers[partition].AckID(msgID)
+       c.consumers[partition].AckID(mid)
+}
+
+func (c *consumer) Nack(msg Message) {
+       c.AckID(msg.ID())
+}
+
+func (c *consumer) NackID(msgID MessageID) {
+       mid, ok := msgID.(*messageID)
+       if !ok {
+               c.log.Warnf("invalid message id type")
+               return
+       }
+
+       partition := mid.partitionIdx
+       // did we receive a valid partition index?
+       if partition < 0 || partition >= len(c.consumers) {
+               c.log.Warnf("invalid partition index %d expected a partition 
between [0-%d]",
+                       partition, len(c.consumers))
+               return
+       }
+
+       c.consumers[partition].NackID(mid)
 }
 
 func (c *consumer) Close() error {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index aaf6ade..a8d9b6c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,7 @@ type partitionConsumerOpts struct {
        subscriptionInitPos SubscriptionInitialPosition
        partitionIdx        int
        receiverQueueSize   int
+       nackRedeliveryDelay time.Duration
 }
 
 type partitionConsumer struct {
@@ -78,6 +79,8 @@ type partitionConsumer struct {
        connectedCh chan struct{}
        closeCh     chan struct{}
 
+       nackTracker *negativeAcksTracker
+
        log *log.Entry
 }
 
@@ -101,6 +104,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                log:            log.WithField("topic", options.topic),
        }
        pc.log = pc.log.WithField("name", pc.name).WithField("subscription", 
options.subscription)
+       pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
 
        err := pc.grabConn()
        if err != nil {
@@ -143,19 +147,39 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
        pc.conn.DeleteConsumeHandler(pc.consumerID)
 }
 
-func (pc *partitionConsumer) Ack(msg Message) error {
-       return pc.AckID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckID(msgID MessageID) error {
+func (pc *partitionConsumer) AckID(msgID *messageID) {
        req := &ackRequest{
-               doneCh: make(chan struct{}),
-               msgID:  msgID,
+               msgID: msgID,
        }
        pc.eventsCh <- req
+}
 
-       <-req.doneCh
-       return req.err
+func (pc *partitionConsumer) NackID(msgID *messageID) {
+       pc.nackTracker.Add(msgID)
+}
+
+func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+       pc.eventsCh <- &redeliveryRequest{msgIds}
+}
+
+func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
+       msgIds := req.msgIds
+       pc.log.Debug("Request redelivery after negative ack for messages", 
msgIds)
+
+       msgIdDataList := make([]*pb.MessageIdData, len(msgIds))
+       for i := 0; i < len(msgIds); i++ {
+               msgIdDataList[i] = &pb.MessageIdData{
+                       LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
+                       EntryId:  proto.Uint64(uint64(msgIds[i].entryID)),
+               }
+       }
+
+       requestID := internal.RequestIDNoResponse
+       pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID,
+               pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, 
&pb.CommandRedeliverUnacknowledgedMessages{
+                       ConsumerId: proto.Uint64(pc.consumerID),
+                       MessageIds: msgIdDataList,
+               })
 }
 
 func (pc *partitionConsumer) Close() error {
@@ -172,28 +196,21 @@ func (pc *partitionConsumer) Close() error {
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
-       defer close(req.doneCh)
+       msgId := req.msgID
 
-       id := &pb.MessageIdData{}
-       messageIDs := make([]*pb.MessageIdData, 0)
-       err := proto.Unmarshal(req.msgID.Serialize(), id)
-       if err != nil {
-               pc.log.WithError(err).Error("unable to serialize message id")
-               req.err = err
+       messageIDs := make([]*pb.MessageIdData, 1)
+       messageIDs[0] = &pb.MessageIdData{
+               LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
+               EntryId:  proto.Uint64(uint64(msgId.entryID)),
        }
-
-       messageIDs = append(messageIDs, id)
        requestID := internal.RequestIDNoResponse
        cmdAck := &pb.CommandAck{
                ConsumerId: proto.Uint64(pc.consumerID),
                MessageId:  messageIDs,
                AckType:    pb.CommandAck_Individual.Enum(),
        }
-       _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, 
pb.BaseCommand_ACK, cmdAck)
-       if err != nil {
-               pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
-               req.err = err
-       }
+
+       pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, 
pb.BaseCommand_ACK, cmdAck)
 }
 
 func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
@@ -369,9 +386,7 @@ func (pc *partitionConsumer) dispatcher() {
 }
 
 type ackRequest struct {
-       doneCh chan struct{}
-       msgID  MessageID
-       err    error
+       msgID *messageID
 }
 
 type unsubscribeRequest struct {
@@ -384,6 +399,10 @@ type closeRequest struct {
        err    error
 }
 
+type redeliveryRequest struct {
+       msgIds []messageID
+}
+
 func (pc *partitionConsumer) runEventsLoop() {
        defer func() {
                pc.log.Info("exiting events loop")
@@ -396,6 +415,8 @@ func (pc *partitionConsumer) runEventsLoop() {
                        switch v := i.(type) {
                        case *ackRequest:
                                pc.internalAck(v)
+                       case *redeliveryRequest:
+                               pc.internalRedeliver(v)
                        case *unsubscribeRequest:
                                pc.internalUnsubscribe(v)
                        case *connectionClosed:
@@ -429,6 +450,7 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
                pc.log.Info("Closed consumer")
                pc.state = consumerClosed
                pc.conn.DeleteConsumeHandler(pc.consumerID)
+               pc.nackTracker.Close()
                close(pc.closeCh)
        }
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 33c2f15..c45b932 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -90,9 +90,7 @@ func TestProducerConsumer(t *testing.T) {
                assert.Equal(t, expectProperties, msg.Properties())
 
                // ack message
-               if err := consumer.Ack(msg); err != nil {
-                       log.Fatal(err)
-               }
+               consumer.Ack(msg)
        }
 }
 
@@ -163,8 +161,7 @@ func TestBatchMessageReceive(t *testing.T) {
        for i := 0; i < numOfMessages; i++ {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
-               err = consumer.Ack(msg)
-               assert.Nil(t, err)
+               consumer.Ack(msg)
                count++
        }
 
@@ -301,17 +298,13 @@ func TestConsumerKeyShared(t *testing.T) {
                                break
                        }
                        receivedConsumer1++
-                       if err := consumer1.Ack(cm.Message); err != nil {
-                               log.Fatal(err)
-                       }
+                       consumer1.Ack(cm.Message)
                case cm, ok := <-consumer2.Chan():
                        if !ok {
                                break
                        }
                        receivedConsumer2++
-                       if err := consumer2.Ack(cm.Message); err != nil {
-                               log.Fatal(err)
-                       }
+                       consumer2.Ack(cm.Message)
                }
        }
 
@@ -372,9 +365,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
                fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
 
-               if err := consumer.Ack(msg); err != nil {
-                       assert.Nil(t, err)
-               }
+               consumer.Ack(msg)
        }
 
        assert.Equal(t, len(msgs), 10)
@@ -465,9 +456,7 @@ func TestConsumerShared(t *testing.T) {
                        payload := string(cm.Message.Payload())
                        messages[payload] = struct{}{}
                        fmt.Printf("consumer1 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
-                       if err := consumer1.Ack(cm.Message); err != nil {
-                               log.Fatal(err)
-                       }
+                       consumer1.Ack(cm.Message)
                case cm, ok := <-consumer2.Chan():
                        if !ok {
                                break
@@ -476,9 +465,7 @@ func TestConsumerShared(t *testing.T) {
                        payload := string(cm.Message.Payload())
                        messages[payload] = struct{}{}
                        fmt.Printf("consumer2 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
-                       if err := consumer2.Ack(cm.Message); err != nil {
-                               log.Fatal(err)
-                       }
+                       consumer2.Ack(cm.Message)
                }
        }
 
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index d806087..b0620c2 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -146,9 +146,9 @@ func (p *producer) LastSequenceID() int64 {
 
 func (p *producer) Flush() error {
        for _, pp := range p.producers {
-                if err :=  pp.Flush(); err != nil {
-                       return err
-                }
+               if err := pp.Flush(); err != nil {
+                       return err
+               }
 
        }
        return nil
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
new file mode 100644
index 0000000..e8a79e8
--- /dev/null
+++ b/pulsar/negative_acks_tracker.go
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       log "github.com/sirupsen/logrus"
+       "sync"
+       "time"
+)
+
+type redeliveryConsumer interface {
+       Redeliver(msgIds []messageID)
+}
+
+type negativeAcksTracker struct {
+       sync.Mutex
+
+       doneCh       chan interface{}
+       negativeAcks map[messageID]time.Time
+       rc           redeliveryConsumer
+       tick         *time.Ticker
+       delay        time.Duration
+}
+
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) 
*negativeAcksTracker {
+       t := &negativeAcksTracker{
+               doneCh:       make(chan interface{}),
+               negativeAcks: make(map[messageID]time.Time),
+               rc:           rc,
+               tick:         time.NewTicker(delay / 3),
+               delay:        delay,
+       }
+
+       go t.track()
+       return t
+}
+
+func (t *negativeAcksTracker) Add(msgID *messageID) {
+       // Always clear up the batch index since we want to track the nack
+       // for the entire batch
+       batchMsgId := messageID{
+               ledgerID: msgID.ledgerID,
+               entryID:  msgID.entryID,
+               batchIdx: 0,
+       }
+
+       t.Lock()
+       defer t.Unlock()
+
+       _, present := t.negativeAcks[batchMsgId]
+       if present {
+               // The batch is already being tracked
+               return
+       } else {
+               targetTime := time.Now().Add(t.delay)
+               t.negativeAcks[batchMsgId] = targetTime
+       }
+}
+
+func (t *negativeAcksTracker) track() {
+       for {
+               select {
+               case <-t.doneCh:
+                       log.Debug("Closing nack tracker")
+                       return
+
+               case <-t.tick.C:
+                       {
+                               t.Lock()
+
+                               now := time.Now()
+                               msgIds := make([]messageID, 0)
+                               for msgID, targetTime := range t.negativeAcks {
+                                       log.Debugf("MsgId: %v -- targetTime: %v 
-- now: %v", msgID, targetTime, now)
+                                       if targetTime.Before(now) {
+                                               log.Debugf("Adding MsgId: %v", 
msgID)
+                                               msgIds = append(msgIds, msgID)
+                                               delete(t.negativeAcks, msgID)
+                                       }
+                               }
+
+                               t.Unlock()
+
+                               if len(msgIds) > 0 {
+                                       t.rc.Redeliver(msgIds)
+                               }
+                       }
+
+               }
+       }
+}
+
+func (t *negativeAcksTracker) Close() {
+       t.doneCh <- nil
+}
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
new file mode 100644
index 0000000..733114a
--- /dev/null
+++ b/pulsar/negative_acks_tracker_test.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "github.com/stretchr/testify/assert"
+       "sort"
+       "sync"
+       "testing"
+       "time"
+)
+
+type nackMockedConsumer struct {
+       sync.Mutex
+       cond   *sync.Cond
+       msgIds []messageID
+}
+
+func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+       nmc.Lock()
+       if nmc.msgIds == nil {
+               nmc.msgIds = msgIds
+               sort.Slice(msgIds, func(i, j int) bool {
+                       return msgIds[i].ledgerID < msgIds[j].entryID
+               })
+               nmc.cond.Signal()
+       }
+
+       nmc.Unlock()
+}
+
+func (nmc *nackMockedConsumer) Wait() []messageID {
+       nmc.Lock()
+       defer nmc.Unlock()
+       nmc.cond.Wait()
+
+       return nmc.msgIds
+}
+
+func TestNacksTracker(t *testing.T) {
+       nmc := &nackMockedConsumer{}
+       nmc.cond = sync.NewCond(nmc)
+       nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+
+       nacks.Add(&messageID{
+               ledgerID: 1,
+               entryID:  1,
+               batchIdx: 1,
+       })
+
+       nacks.Add(&messageID{
+               ledgerID: 2,
+               entryID:  2,
+               batchIdx: 1,
+       })
+
+       msgIds := nmc.Wait()
+
+       assert.Equal(t, 2, len(msgIds))
+       assert.Equal(t, int64(1), msgIds[0].ledgerID)
+       assert.Equal(t, int64(1), msgIds[0].entryID)
+       assert.Equal(t, int64(2), msgIds[1].ledgerID)
+       assert.Equal(t, int64(2), msgIds[1].entryID)
+
+       nacks.Close()
+}
+
+func TestNacksWithBatchesTracker(t *testing.T) {
+       nmc := &nackMockedConsumer{}
+       nmc.cond = sync.NewCond(nmc)
+       nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+
+       nacks.Add(&messageID{
+               ledgerID: 1,
+               entryID:  1,
+               batchIdx: 1,
+       })
+
+       nacks.Add(&messageID{
+               ledgerID: 1,
+               entryID:  1,
+               batchIdx: 2,
+       })
+
+       nacks.Add(&messageID{
+               ledgerID: 1,
+               entryID:  1,
+               batchIdx: 3,
+       })
+
+       nacks.Add(&messageID{
+               ledgerID: 2,
+               entryID:  2,
+               batchIdx: 1,
+       })
+
+       msgIds := nmc.Wait()
+
+       assert.Equal(t, 2, len(msgIds))
+       assert.Equal(t, int64(1), msgIds[0].ledgerID)
+       assert.Equal(t, int64(1), msgIds[0].entryID)
+       assert.Equal(t, int64(2), msgIds[1].ledgerID)
+       assert.Equal(t, int64(2), msgIds[1].entryID)
+
+       nacks.Close()
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 052e92a..1961172 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -434,8 +434,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
                fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
                assert.Nil(t, err)
-               err = consumer.Ack(msg)
-               assert.Nil(t, err)
+               consumer.Ack(msg)
                msgCount++
        }
        assert.Equal(t, msgCount, numOfMessages/2)

Reply via email to