This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a4842eeac feat(queue): record query and control operations in
publisher metrics (#1166)
a4842eeac is described below
commit a4842eeacdb2aa6022c4101e547904698062d67a
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 9 11:53:25 2026 +0800
feat(queue): record query and control operations in publisher metrics
(#1166)
---
banyand/queue/pub/batch.go | 4 ++++
banyand/queue/pub/pub.go | 55 ++++++++++++++++++++++++++++++++++++++++++----
2 files changed, 55 insertions(+), 4 deletions(-)
diff --git a/banyand/queue/pub/batch.go b/banyand/queue/pub/batch.go
index 47eaebfda..0cdf12686 100644
--- a/banyand/queue/pub/batch.go
+++ b/banyand/queue/pub/batch.go
@@ -50,6 +50,10 @@ const (
// Remote-side failures (observed after the frame was written to the
stream).
sendErrReasonRecvError = "recv_error" // s.Recv returned an
error (connection/protocol layer).
sendErrReasonServerRejected = "server_rejected" // Server responded
with a non-empty Error (includes failover statuses).
+ // Failures specific to the non-batched publish path (query/control
operations).
+ sendErrReasonSendError = "send_error" // Opening the stream or
writing the first frame failed.
+ sendErrReasonDecodeError = "decode_error" // The response body could
not be decoded.
+ sendErrReasonInvalidTopic = "invalid_topic" // No response codec is
registered for the topic.
)
type writeStream struct {
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e7d656183..c0805d254 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -372,7 +372,8 @@ type publishResult struct {
func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages
...bus.Message) (bus.Future, error) {
var err error
f := &future{
- log: p.log,
+ log: p.log,
+ metrics: p.metrics,
}
handleMessage := func(m bus.Message, err error) error {
r, errSend := messageToRequest(topic, m)
@@ -386,20 +387,37 @@ func (p *pub) publish(timeout time.Duration, topic
bus.Topic, messages ...bus.Me
r.SenderRole = p.selfRole
r.SenderTier = p.selfTier
node := m.Node()
+ group := m.Group()
+ info := p.getNodeInfo(node)
execErr := p.connMgr.Execute(node, func(c *client) error {
ctx, cancel :=
context.WithTimeout(context.Background(), timeout)
f.cancelFn = append(f.cancelFn, cancel)
stream, errCreateStream := c.client.Send(ctx)
if errCreateStream != nil {
// Record failure for circuit breaker (only for
transient/internal errors)
+ if p.metrics != nil {
+ p.metrics.totalErr.Inc(1,
apidata.OperationOf(topic), group, node, info.role, info.tier,
sendErrReasonSendError)
+ }
return fmt.Errorf("failed to get stream for
node %s: %w", node, errCreateStream)
}
if sendErr := stream.Send(r); sendErr != nil {
+ if p.metrics != nil {
+ p.metrics.totalErr.Inc(1,
apidata.OperationOf(topic), group, node, info.role, info.tier,
sendErrReasonSendError)
+ }
return fmt.Errorf("failed to send message to
node %s: %w", node, sendErr)
}
f.clients = append(f.clients, stream)
f.topics = append(f.topics, topic)
f.nodes = append(f.nodes, node)
+ f.groups = append(f.groups, group)
+ f.roles = append(f.roles, info.role)
+ f.tiers = append(f.tiers, info.tier)
+ f.starts = append(f.starts, time.Now())
+ // Started is recorded here (one stream per message);
the matching
+ // finished/latency is observed when the response is
read in Get.
+ if p.metrics != nil {
+ p.metrics.totalStarted.Inc(1,
apidata.OperationOf(topic), group, node, info.role, info.tier)
+ }
return nil
})
if execErr != nil {
@@ -553,13 +571,18 @@ func messageToRequest(topic bus.Topic, m bus.Message)
(*clusterv1.SendRequest, e
type future struct {
log *logger.Logger
+ metrics *pubMetrics
clients []clusterv1.Service_SendClient
cancelFn []func()
topics []bus.Topic
nodes []string
+ groups []string
+ roles []string
+ tiers []string
+ starts []time.Time
}
-func (l *future) Get() (bus.Message, error) {
+func (l *future) Get() (msg bus.Message, err error) {
if len(l.clients) < 1 {
return bus.Message{}, io.EOF
}
@@ -567,10 +590,15 @@ func (l *future) Get() (bus.Message, error) {
c := l.clients[0]
t := l.topics[0]
n := l.nodes[0]
+ group := l.groups[0]
+ role := l.roles[0]
+ tier := l.tiers[0]
+ start := l.starts[0]
+ errReason := sendErrReasonRecvError
defer func() {
- if err := c.CloseSend(); err != nil {
- l.log.Error().Err(err).Msg("failed to close send
stream")
+ if closeErr := c.CloseSend(); closeErr != nil {
+ l.log.Error().Err(closeErr).Msg("failed to close send
stream")
}
l.clients = l.clients[1:]
@@ -578,12 +606,29 @@ func (l *future) Get() (bus.Message, error) {
l.cancelFn[0]()
l.cancelFn = l.cancelFn[1:]
l.nodes = l.nodes[1:]
+ l.groups = l.groups[1:]
+ l.roles = l.roles[1:]
+ l.tiers = l.tiers[1:]
+ l.starts = l.starts[1:]
+
+ // The non-batched publish path (query/control) records its
finished and
+ // latency here, when the response is read; errors are split by
reason.
+ if l.metrics != nil {
+ operation := apidata.OperationOf(t)
+ if err != nil {
+ l.metrics.totalErr.Inc(1, operation, group, n,
role, tier, errReason)
+ } else {
+ l.metrics.totalFinished.Inc(1, operation,
group, n, role, tier)
+ }
+
l.metrics.totalLatency.Observe(time.Since(start).Seconds(), operation, group,
n, role, tier)
+ }
}()
resp, err := c.Recv()
if err != nil {
return bus.Message{}, err
}
if resp.Error != "" {
+ errReason = sendErrReasonServerRejected
return bus.Message{}, errors.New(resp.Error)
}
if resp.Body == nil {
@@ -592,6 +637,7 @@ func (l *future) Get() (bus.Message, error) {
if codec, ok := apidata.TopicResponseMap[t]; ok {
m, decodeErr := codec.Unmarshal(resp.Body)
if decodeErr != nil {
+ errReason = sendErrReasonDecodeError
return bus.Message{}, decodeErr
}
return bus.NewMessageWithNode(
@@ -600,6 +646,7 @@ func (l *future) Get() (bus.Message, error) {
m,
), nil
}
+ errReason = sendErrReasonInvalidTopic
return bus.Message{}, fmt.Errorf("invalid topic %s", t)
}