This is an automated email from the ASF dual-hosted git repository. crossoverjie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 682bf5fd [Issue 1233] Fix the issue where the AckIDCumulativ cannot return error. (#1235) 682bf5fd is described below commit 682bf5fde149754c1648fbffccd39c2c51f17551 Author: crossoverJie <crossover...@gmail.com> AuthorDate: Thu Jul 4 10:56:14 2024 +0800 [Issue 1233] Fix the issue where the AckIDCumulativ cannot return error. (#1235) --- pulsar/consumer_partition.go | 2 +- pulsar/consumer_test.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 794f8928..d8001dc1 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -730,7 +730,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon var ackReq *ackRequest if withResponse { - ackReq := pc.sendCumulativeAck(msgIDToAck) + ackReq = pc.sendCumulativeAck(msgIDToAck) <-ackReq.doneCh } else { pc.ackGroupingTracker.addCumulative(msgIDToAck) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 54c91ec0..2ecdf8eb 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1010,11 +1010,13 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { if i == N-1 { // cumulative ack the first half of messages - c1.AckCumulative(msg) + err := c1.AckCumulative(msg) + assert.Nil(t, err) } else if i == N { // the N+1 msg is in the second batch // cumulative ack it to test if the first batch can be acked - c2.AckCumulative(msg) + err := c2.AckCumulative(msg) + assert.Nil(t, err) } } @@ -3950,7 +3952,8 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o // Acknowledge half of the messages if cumulative { msgID := msgIds[BatchingMaxSize/2-1] - consumer.AckIDCumulative(msgID) + err := consumer.AckIDCumulative(msgID) + assert.Nil(t, err) log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) } else { for i := 0; i < BatchingMaxSize; i++ { @@ -3985,7 +3988,8 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o } if cumulative { msgID := msgIds[BatchingMaxSize-1] - consumer.AckIDCumulative(msgID) + err := consumer.AckIDCumulative(msgID) + assert.Nil(t, err) log.Printf("Acknowledge %v:%d cumulatively\n", msgID, msgID.BatchIdx()) } consumer.Close()