This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a4842eeac feat(queue): record query and control operations in 
publisher metrics (#1166)
a4842eeac is described below

commit a4842eeacdb2aa6022c4101e547904698062d67a
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 9 11:53:25 2026 +0800

    feat(queue): record query and control operations in publisher metrics 
(#1166)
---
 banyand/queue/pub/batch.go |  4 ++++
 banyand/queue/pub/pub.go   | 55 ++++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index 47eaebfda..0cdf12686 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -50,6 +50,10 @@ const (
        // Remote-side failures (observed after the frame was written to the 
stream).
        sendErrReasonRecvError      = "recv_error"      // s.Recv returned an 
error (connection/protocol layer).
        sendErrReasonServerRejected = "server_rejected" // Server responded 
with a non-empty Error (includes failover statuses).
+       // Failures specific to the non-batched publish path (query/control 
operations).
+       sendErrReasonSendError    = "send_error"    // Opening the stream or 
writing the first frame failed.
+       sendErrReasonDecodeError  = "decode_error"  // The response body could 
not be decoded.
+       sendErrReasonInvalidTopic = "invalid_topic" // No response codec is 
registered for the topic.
 )
 
 type writeStream struct {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e7d656183..c0805d254 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -372,7 +372,8 @@ type publishResult struct {
 func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages 
...bus.Message) (bus.Future, error) {
        var err error
        f := &future{
-               log: p.log,
+               log:     p.log,
+               metrics: p.metrics,
        }
        handleMessage := func(m bus.Message, err error) error {
                r, errSend := messageToRequest(topic, m)
@@ -386,20 +387,37 @@ func (p *pub) publish(timeout time.Duration, topic 
bus.Topic, messages ...bus.Me
                r.SenderRole = p.selfRole
                r.SenderTier = p.selfTier
                node := m.Node()
+               group := m.Group()
+               info := p.getNodeInfo(node)
                execErr := p.connMgr.Execute(node, func(c *client) error {
                        ctx, cancel := 
context.WithTimeout(context.Background(), timeout)
                        f.cancelFn = append(f.cancelFn, cancel)
                        stream, errCreateStream := c.client.Send(ctx)
                        if errCreateStream != nil {
                                // Record failure for circuit breaker (only for 
transient/internal errors)
+                               if p.metrics != nil {
+                                       p.metrics.totalErr.Inc(1, 
apidata.OperationOf(topic), group, node, info.role, info.tier, 
sendErrReasonSendError)
+                               }
                                return fmt.Errorf("failed to get stream for 
node %s: %w", node, errCreateStream)
                        }
                        if sendErr := stream.Send(r); sendErr != nil {
+                               if p.metrics != nil {
+                                       p.metrics.totalErr.Inc(1, 
apidata.OperationOf(topic), group, node, info.role, info.tier, 
sendErrReasonSendError)
+                               }
                                return fmt.Errorf("failed to send message to 
node %s: %w", node, sendErr)
                        }
                        f.clients = append(f.clients, stream)
                        f.topics = append(f.topics, topic)
                        f.nodes = append(f.nodes, node)
+                       f.groups = append(f.groups, group)
+                       f.roles = append(f.roles, info.role)
+                       f.tiers = append(f.tiers, info.tier)
+                       f.starts = append(f.starts, time.Now())
+                       // Started is recorded here (one stream per message); 
the matching
+                       // finished/latency is observed when the response is 
read in Get.
+                       if p.metrics != nil {
+                               p.metrics.totalStarted.Inc(1, 
apidata.OperationOf(topic), group, node, info.role, info.tier)
+                       }
                        return nil
                })
                if execErr != nil {
@@ -553,13 +571,18 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
 
 type future struct {
        log      *logger.Logger
+       metrics  *pubMetrics
        clients  []clusterv1.Service_SendClient
        cancelFn []func()
        topics   []bus.Topic
        nodes    []string
+       groups   []string
+       roles    []string
+       tiers    []string
+       starts   []time.Time
 }
 
-func (l *future) Get() (bus.Message, error) {
+func (l *future) Get() (msg bus.Message, err error) {
        if len(l.clients) < 1 {
                return bus.Message{}, io.EOF
        }
@@ -567,10 +590,15 @@ func (l *future) Get() (bus.Message, error) {
        c := l.clients[0]
        t := l.topics[0]
        n := l.nodes[0]
+       group := l.groups[0]
+       role := l.roles[0]
+       tier := l.tiers[0]
+       start := l.starts[0]
 
+       errReason := sendErrReasonRecvError
        defer func() {
-               if err := c.CloseSend(); err != nil {
-                       l.log.Error().Err(err).Msg("failed to close send 
stream")
+               if closeErr := c.CloseSend(); closeErr != nil {
+                       l.log.Error().Err(closeErr).Msg("failed to close send 
stream")
                }
 
                l.clients = l.clients[1:]
@@ -578,12 +606,29 @@ func (l *future) Get() (bus.Message, error) {
                l.cancelFn[0]()
                l.cancelFn = l.cancelFn[1:]
                l.nodes = l.nodes[1:]
+               l.groups = l.groups[1:]
+               l.roles = l.roles[1:]
+               l.tiers = l.tiers[1:]
+               l.starts = l.starts[1:]
+
+               // The non-batched publish path (query/control) records its 
finished and
+               // latency here, when the response is read; errors are split by 
reason.
+               if l.metrics != nil {
+                       operation := apidata.OperationOf(t)
+                       if err != nil {
+                               l.metrics.totalErr.Inc(1, operation, group, n, 
role, tier, errReason)
+                       } else {
+                               l.metrics.totalFinished.Inc(1, operation, 
group, n, role, tier)
+                       }
+                       
l.metrics.totalLatency.Observe(time.Since(start).Seconds(), operation, group, 
n, role, tier)
+               }
        }()
        resp, err := c.Recv()
        if err != nil {
                return bus.Message{}, err
        }
        if resp.Error != "" {
+               errReason = sendErrReasonServerRejected
                return bus.Message{}, errors.New(resp.Error)
        }
        if resp.Body == nil {
@@ -592,6 +637,7 @@ func (l *future) Get() (bus.Message, error) {
        if codec, ok := apidata.TopicResponseMap[t]; ok {
                m, decodeErr := codec.Unmarshal(resp.Body)
                if decodeErr != nil {
+                       errReason = sendErrReasonDecodeError
                        return bus.Message{}, decodeErr
                }
                return bus.NewMessageWithNode(
@@ -600,6 +646,7 @@ func (l *future) Get() (bus.Message, error) {
                        m,
                ), nil
        }
+       errReason = sendErrReasonInvalidTopic
        return bus.Message{}, fmt.Errorf("invalid topic %s", t)
 }
 

Reply via email to