This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dee1133 fix: add missing metric tracking of 
`pulsar_client_consumer_acks` for AckIDList method (#1396)
0dee1133 is described below

commit 0dee1133982f4a017d791211787dc1c90e4b8c44
Author: Zike Yang <z...@apache.org>
AuthorDate: Thu Jul 24 19:04:04 2025 +0800

    fix: add missing metric tracking of `pulsar_client_consumer_acks` for 
AckIDList method (#1396)
---
 pulsar/consumer_partition.go |  3 +++
 pulsar/consumer_test.go      | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 6a8cf83d..61c82bf6 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -760,6 +760,7 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) 
error {
                                position := newPosition(msgID)
                                if convertedMsgID.ack() {
                                        pendingAcks[position] = nil
+                                       
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-convertedMsgID.receivedTime.UnixNano())
 / 1.0e9)
                                } else if pc.options.enableBatchIndexAck {
                                        pendingAcks[position] = 
convertedMsgID.tracker.getAckBitSet()
                                }
@@ -785,6 +786,8 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) 
error {
                return toAckError(map[error][]MessageID{errors.New("consumer 
state is closed"): validMsgIDs})
        }
 
+       pc.metrics.AcksCounter.Add(float64(len(validMsgIDs)))
+
        req := &ackListRequest{
                errCh:  make(chan error),
                msgIDs: toMsgIDDataList(pendingAcks),
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2351e202..3a1276fc 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -33,6 +33,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
+       "github.com/prometheus/client_golang/prometheus"
 
        "github.com/stretchr/testify/require"
        "github.com/testcontainers/testcontainers-go"
@@ -4856,8 +4857,30 @@ func TestAckIDList(t *testing.T) {
        }
 }
 
+func getAckCount(registry *prometheus.Registry) (int, error) {
+       metrics, err := registry.Gather()
+       if err != nil {
+               return 0, err
+       }
+
+       var ackCount float64
+       for _, metric := range metrics {
+               if metric.GetName() == "pulsar_client_consumer_acks" {
+                       for _, m := range metric.GetMetric() {
+                               ackCount += m.GetCounter().GetValue()
+                       }
+               }
+       }
+       return int(ackCount), nil
+}
+
 func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) {
-       client, err := NewClient(ClientOptions{URL: lookupURL})
+       // Create a custom metrics registry
+       registry := prometheus.NewRegistry()
+       client, err := NewClient(ClientOptions{
+               URL:               lookupURL,
+               MetricsRegisterer: registry,
+       })
        assert.Nil(t, err)
        defer client.Close()
 
@@ -4886,6 +4909,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck 
bool) {
                msgIDs[i] = msgs[ackedIndexes[i]].ID()
        }
        assert.Nil(t, consumer.AckIDList(msgIDs))
+       ackCnt, err := getAckCount(registry)
+       expectedAckCnt := len(msgIDs)
+       assert.NoError(t, err)
+       assert.Equal(t, expectedAckCnt, ackCnt)
        consumer.Close()
 
        consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck)
@@ -4900,6 +4927,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck 
bool) {
                        msgIDs = append(msgIDs, originalMsgIDs[i])
                }
                assert.Nil(t, consumer.AckIDList(msgIDs))
+               expectedAckCnt = expectedAckCnt + len(msgIDs)
+               ackCnt, err = getAckCount(registry)
+               assert.NoError(t, err)
+               assert.Equal(t, expectedAckCnt, ackCnt)
                consumer.Close()
 
                consumer = createSharedConsumer(t, client, topic, 
enableBatchIndexAck)
@@ -4920,6 +4951,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck 
bool) {
        } else {
                assert.Fail(t, "AckIDList should return AckError")
        }
+
+       ackCnt, err = getAckCount(registry)
+       assert.NoError(t, err)
+       assert.Equal(t, expectedAckCnt, ackCnt) // The Ack Counter shouldn't be 
increased.
 }
 
 func createSharedConsumer(t *testing.T, client Client, topic string, 
enableBatchIndexAck bool) Consumer {

Reply via email to