This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 0dee1133 fix: add missing metric tracking of `pulsar_client_consumer_acks` for AckIDList method (#1396) 0dee1133 is described below commit 0dee1133982f4a017d791211787dc1c90e4b8c44 Author: Zike Yang <z...@apache.org> AuthorDate: Thu Jul 24 19:04:04 2025 +0800 fix: add missing metric tracking of `pulsar_client_consumer_acks` for AckIDList method (#1396) --- pulsar/consumer_partition.go | 3 +++ pulsar/consumer_test.go | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 6a8cf83d..61c82bf6 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -760,6 +760,7 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error { position := newPosition(msgID) if convertedMsgID.ack() { pendingAcks[position] = nil + pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-convertedMsgID.receivedTime.UnixNano()) / 1.0e9) } else if pc.options.enableBatchIndexAck { pendingAcks[position] = convertedMsgID.tracker.getAckBitSet() } @@ -785,6 +786,8 @@ func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) error { return toAckError(map[error][]MessageID{errors.New("consumer state is closed"): validMsgIDs}) } + pc.metrics.AcksCounter.Add(float64(len(validMsgIDs))) + req := &ackListRequest{ errCh: make(chan error), msgIDs: toMsgIDDataList(pendingAcks), diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 2351e202..3a1276fc 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -33,6 +33,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" @@ -4856,8 +4857,30 @@ func TestAckIDList(t *testing.T) { } } +func getAckCount(registry *prometheus.Registry) (int, error) { + metrics, err := registry.Gather() + if err != nil { + return 0, err + } + + var ackCount float64 + for _, metric := range metrics { + if metric.GetName() == "pulsar_client_consumer_acks" { + for _, m := range metric.GetMetric() { + ackCount += m.GetCounter().GetValue() + } + } + } + return int(ackCount), nil +} + func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) { - client, err := NewClient(ClientOptions{URL: lookupURL}) + // Create a custom metrics registry + registry := prometheus.NewRegistry() + client, err := NewClient(ClientOptions{ + URL: lookupURL, + MetricsRegisterer: registry, + }) assert.Nil(t, err) defer client.Close() @@ -4886,6 +4909,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) { msgIDs[i] = msgs[ackedIndexes[i]].ID() } assert.Nil(t, consumer.AckIDList(msgIDs)) + ackCnt, err := getAckCount(registry) + expectedAckCnt := len(msgIDs) + assert.NoError(t, err) + assert.Equal(t, expectedAckCnt, ackCnt) consumer.Close() consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck) @@ -4900,6 +4927,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) { msgIDs = append(msgIDs, originalMsgIDs[i]) } assert.Nil(t, consumer.AckIDList(msgIDs)) + expectedAckCnt = expectedAckCnt + len(msgIDs) + ackCnt, err = getAckCount(registry) + assert.NoError(t, err) + assert.Equal(t, expectedAckCnt, ackCnt) consumer.Close() consumer = createSharedConsumer(t, client, topic, enableBatchIndexAck) @@ -4920,6 +4951,10 @@ func runAckIDListTest(t *testing.T, enableBatchIndexAck bool) { } else { assert.Fail(t, "AckIDList should return AckError") } + + ackCnt, err = getAckCount(registry) + assert.NoError(t, err) + assert.Equal(t, expectedAckCnt, ackCnt) // The Ack Counter shouldn't be increased. } func createSharedConsumer(t *testing.T, client Client, topic string, enableBatchIndexAck bool) Consumer {