This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d65d0ec feat: Add Creation and Last Publish Timestamps to Topic Stats 
(PIP-431) (#1451)
9d65d0ec is described below

commit 9d65d0ecf43f357a7a318c361553b258d88c56ea
Author: Penghui Li <[email protected]>
AuthorDate: Tue Dec 16 18:15:58 2025 -0800

    feat: Add Creation and Last Publish Timestamps to Topic Stats (PIP-431) 
(#1451)
---
 pulsaradmin/pkg/admin/topic_test.go | 100 ++++++++++++++++++++++++++++++++++++
 pulsaradmin/pkg/utils/data.go       |  54 ++++++++++---------
 2 files changed, 129 insertions(+), 25 deletions(-)

diff --git a/pulsaradmin/pkg/admin/topic_test.go 
b/pulsaradmin/pkg/admin/topic_test.go
index f40c0bbc..5655930d 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -1554,3 +1554,103 @@ func TestTopics_PublishRate(t *testing.T) {
                100*time.Millisecond,
        )
 }
+
+func TestTopicStatsTimestamps(t *testing.T) {
+       topic := 
fmt.Sprintf("persistent://public/default/test-topic-stats-timestamps-%d", 
time.Now().Nanosecond())
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+
+       // Create topic
+       err = admin.Topics().Create(*topicName, 0)
+       assert.NoError(t, err)
+       defer admin.Topics().Delete(*topicName, true, true)
+
+       // Get stats and verify CreationTimestamp
+       stats, err := admin.Topics().GetStats(*topicName)
+       assert.NoError(t, err)
+       assert.Greater(t, stats.TopicCreationTimeStamp, int64(0), 
"CreationTimestamp should be greater than 0")
+       // LastPublishTimestamp should be 0 before any message is published
+       assert.Equal(t, int64(0), stats.LastPublishTimestamp, 
"LastPublishTimestamp should be 0 initially")
+
+       // Publish a message
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic: topic,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+               Payload: []byte("test-message"),
+       })
+       assert.NoError(t, err)
+
+       // Wait for stats to update (stats are updated asynchronously in broker)
+       assert.Eventually(t, func() bool {
+               s, err := admin.Topics().GetStats(*topicName)
+               if err != nil {
+                       return false
+               }
+               return s.LastPublishTimestamp > 0
+       }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should 
update after publishing")
+}
+
+func TestPartitionedTopicStatsTimestamps(t *testing.T) {
+       topic := 
fmt.Sprintf("persistent://public/default/test-partitioned-topic-stats-timestamps-%d",
 time.Now().Nanosecond())
+       cfg := &config.Config{}
+       admin, err := New(cfg)
+       assert.NoError(t, err)
+
+       topicName, err := utils.GetTopicName(topic)
+       assert.NoError(t, err)
+
+       // Create partitioned topic
+       err = admin.Topics().Create(*topicName, 2)
+       assert.NoError(t, err)
+       defer admin.Topics().Delete(*topicName, true, true)
+
+       // Get partitioned stats and verify CreationTimestamp
+       stats, err := admin.Topics().GetPartitionedStats(*topicName, true)
+       assert.NoError(t, err)
+       assert.Greater(t, stats.TopicCreationTimeStamp, int64(0), 
"CreationTimestamp should be greater than 0")
+       assert.Equal(t, int64(0), stats.LastPublishTimestamp, 
"LastPublishTimestamp should be 0 initially")
+
+       // Publish a message
+       client, err := pulsar.NewClient(pulsar.ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic: topic,
+       })
+       assert.NoError(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       _, err = producer.Send(ctx, &pulsar.ProducerMessage{
+               Payload: []byte("test-message"),
+       })
+       assert.NoError(t, err)
+
+       // Wait for stats to update
+       assert.Eventually(t, func() bool {
+               s, err := admin.Topics().GetPartitionedStats(*topicName, true)
+               if err != nil {
+                       return false
+               }
+               // Partitioned stats LastPublishTimestamp is usually an 
aggregation or max of partitions
+               return s.LastPublishTimestamp > 0
+       }, 10*time.Second, 500*time.Millisecond, "LastPublishTimestamp should 
update after publishing")
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 441aa33b..43f0ec62 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -229,19 +229,21 @@ type NamespacesData struct {
 }
 
 type TopicStats struct {
-       BacklogSize         int64                        `json:"backlogSize"`
-       MsgCounterIn        int64                        `json:"msgInCounter"`
-       MsgCounterOut       int64                        `json:"msgOutCounter"`
-       MsgRateIn           float64                      `json:"msgRateIn"`
-       MsgRateOut          float64                      `json:"msgRateOut"`
-       MsgThroughputIn     float64                      
`json:"msgThroughputIn"`
-       MsgThroughputOut    float64                      
`json:"msgThroughputOut"`
-       AverageMsgSize      float64                      `json:"averageMsgSize"`
-       StorageSize         int64                        `json:"storageSize"`
-       Publishers          []PublisherStats             `json:"publishers"`
-       Subscriptions       map[string]SubscriptionStats `json:"subscriptions"`
-       Replication         map[string]ReplicatorStats   `json:"replication"`
-       DeDuplicationStatus string                       
`json:"deduplicationStatus"`
+       BacklogSize            int64                        `json:"backlogSize"`
+       MsgCounterIn           int64                        
`json:"msgInCounter"`
+       MsgCounterOut          int64                        
`json:"msgOutCounter"`
+       MsgRateIn              float64                      `json:"msgRateIn"`
+       MsgRateOut             float64                      `json:"msgRateOut"`
+       MsgThroughputIn        float64                      
`json:"msgThroughputIn"`
+       MsgThroughputOut       float64                      
`json:"msgThroughputOut"`
+       AverageMsgSize         float64                      
`json:"averageMsgSize"`
+       StorageSize            int64                        `json:"storageSize"`
+       Publishers             []PublisherStats             `json:"publishers"`
+       Subscriptions          map[string]SubscriptionStats 
`json:"subscriptions"`
+       Replication            map[string]ReplicatorStats   `json:"replication"`
+       DeDuplicationStatus    string                       
`json:"deduplicationStatus"`
+       TopicCreationTimeStamp int64                        
`json:"topicCreationTimeStamp,omitempty"`
+       LastPublishTimestamp   int64                        
`json:"lastPublishTimestamp,omitempty"`
 }
 
 type ProducerAccessMode string
@@ -410,18 +412,20 @@ type CursorStats struct {
 }
 
 type PartitionedTopicStats struct {
-       MsgRateIn           float64                      `json:"msgRateIn"`
-       MsgRateOut          float64                      `json:"msgRateOut"`
-       MsgThroughputIn     float64                      
`json:"msgThroughputIn"`
-       MsgThroughputOut    float64                      
`json:"msgThroughputOut"`
-       AverageMsgSize      float64                      `json:"averageMsgSize"`
-       StorageSize         int64                        `json:"storageSize"`
-       Publishers          []PublisherStats             `json:"publishers"`
-       Subscriptions       map[string]SubscriptionStats `json:"subscriptions"`
-       Replication         map[string]ReplicatorStats   `json:"replication"`
-       DeDuplicationStatus string                       
`json:"deduplicationStatus"`
-       Metadata            PartitionedTopicMetadata     `json:"metadata"`
-       Partitions          map[string]TopicStats        `json:"partitions"`
+       MsgRateIn              float64                      `json:"msgRateIn"`
+       MsgRateOut             float64                      `json:"msgRateOut"`
+       MsgThroughputIn        float64                      
`json:"msgThroughputIn"`
+       MsgThroughputOut       float64                      
`json:"msgThroughputOut"`
+       AverageMsgSize         float64                      
`json:"averageMsgSize"`
+       StorageSize            int64                        `json:"storageSize"`
+       Publishers             []PublisherStats             `json:"publishers"`
+       Subscriptions          map[string]SubscriptionStats 
`json:"subscriptions"`
+       Replication            map[string]ReplicatorStats   `json:"replication"`
+       DeDuplicationStatus    string                       
`json:"deduplicationStatus"`
+       Metadata               PartitionedTopicMetadata     `json:"metadata"`
+       Partitions             map[string]TopicStats        `json:"partitions"`
+       TopicCreationTimeStamp int64                        
`json:"topicCreationTimeStamp,omitempty"`
+       LastPublishTimestamp   int64                        
`json:"lastPublishTimestamp,omitempty"`
 }
 
 type SchemaData struct {

Reply via email to