This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 9e69b539 Add trace query command in bydbctl (#797)
9e69b539 is described below
commit 9e69b5397291da1e86ec181307f12acfa53e956d
Author: mrproliu <[email protected]>
AuthorDate: Thu Oct 2 16:57:25 2025 +0800
Add trace query command in bydbctl (#797)
---
banyand/liaison/grpc/discovery.go | 4 +-
bydbctl/internal/cmd/measure.go | 10 +++-
bydbctl/internal/cmd/property.go | 3 +-
bydbctl/internal/cmd/stream.go | 10 +++-
bydbctl/internal/cmd/topn.go | 10 +++-
bydbctl/internal/cmd/trace.go | 28 ++++++++--
bydbctl/internal/cmd/trace_test.go | 112 +++++++++++++++++++++++++++++++++++++
7 files changed, 161 insertions(+), 16 deletions(-)
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index 8df965ad..acb101bd 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -234,7 +234,6 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
case schema.KindTrace:
trace := schemaMetadata.Spec.(*databasev1.Trace)
id = getID(trace.GetMetadata())
- e.traceMap[id] = trace
// Pre-compute trace ID tag index
traceIDTagName := trace.GetTraceIdTagName()
traceIDIndex := -1
@@ -244,7 +243,10 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
break
}
}
+ e.RWMutex.Lock()
+ e.traceMap[id] = trace
e.traceIDIndexMap[id] = traceIDIndex
+ e.RWMutex.Unlock()
return
default:
return
diff --git a/bydbctl/internal/cmd/measure.go b/bydbctl/internal/cmd/measure.go
index d2d7c32f..e8b9b799 100644
--- a/bydbctl/internal/cmd/measure.go
+++ b/bydbctl/internal/cmd/measure.go
@@ -28,7 +28,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const measureSchemaPath = "/api/v1/measure/schema"
+const (
+ measureSchemaPath = "/api/v1/measure/schema"
+ measureQueryPath = "/api/v1/measure/data"
+ measureListPath = "/api/v1/measure/schema/lists/{group}"
+)
var measureSchemaPathWithParams = measureSchemaPath + pathTemp
@@ -132,7 +136,7 @@ func newMeasureCmd() *cobra.Command {
Short: "List measures",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/measure/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath(measureListPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
@@ -145,7 +149,7 @@ func newMeasureCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/data"))
+ return
request.req.SetBody(request.data).Post(getPath(measureQueryPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
diff --git a/bydbctl/internal/cmd/property.go b/bydbctl/internal/cmd/property.go
index 7b3a1170..460e0413 100644
--- a/bydbctl/internal/cmd/property.go
+++ b/bydbctl/internal/cmd/property.go
@@ -32,6 +32,7 @@ import (
const (
propertySchemaPath = "/api/v1/property/schema"
propertyDataPath = "/api/v1/property/data/{group}/{name}/{id}"
+ propertyQueryPath = "/api/v1/property/data/query"
)
var propertySchemaPathWithParams = propertySchemaPath + pathTemp
@@ -209,7 +210,7 @@ func newPropertyCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
simpleParseFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/property/data/query"))
+ return
request.req.SetBody(request.data).Post(getPath(propertyQueryPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
index adad7bab..6525a23d 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/stream.go
@@ -29,7 +29,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const streamSchemaPath = "/api/v1/stream/schema"
+const (
+ streamSchemaPath = "/api/v1/stream/schema"
+ streamQueryPath = "/api/v1/stream/data"
+ streamListPath = "/api/v1/stream/schema/lists/{group}"
+)
var streamSchemaPathWithParams = streamSchemaPath + pathTemp
@@ -137,7 +141,7 @@ func newStreamCmd() *cobra.Command {
Short: "List streams",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/stream/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath(streamListPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
@@ -150,7 +154,7 @@ func newStreamCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
+ return
request.req.SetBody(request.data).Post(getPath(streamQueryPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
diff --git a/bydbctl/internal/cmd/topn.go b/bydbctl/internal/cmd/topn.go
index d7fa90a9..5c30f892 100644
--- a/bydbctl/internal/cmd/topn.go
+++ b/bydbctl/internal/cmd/topn.go
@@ -28,7 +28,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const topnSchemaPath = "/api/v1/topn-agg/schema"
+const (
+ topnSchemaPath = "/api/v1/topn-agg/schema"
+ topnQueryPath = "/api/v1/measure/topn"
+ topnListPath = "/api/v1/topn-agg/schema/lists/{group}"
+)
var topnSchemaPathWithParams = topnSchemaPath + pathTemp
@@ -136,7 +140,7 @@ func newTopnCmd() *cobra.Command {
Short: "List topn",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/topn-agg/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath(topnListPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
@@ -150,7 +154,7 @@ func newTopnCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/topn"))
+ return
request.req.SetBody(request.data).Post(getPath(topnQueryPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
diff --git a/bydbctl/internal/cmd/trace.go b/bydbctl/internal/cmd/trace.go
index 3318a424..53858f81 100644
--- a/bydbctl/internal/cmd/trace.go
+++ b/bydbctl/internal/cmd/trace.go
@@ -28,7 +28,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const traceSchemaPath = "/api/v1/trace/schema"
+const (
+ traceSchemaPath = "/api/v1/trace/schema"
+ traceQueryPath = "/api/v1/trace/data"
+ traceListPath = "/api/v1/trace/schema/lists/{group}"
+)
var traceSchemaPathWithParams = traceSchemaPath + pathTemp
@@ -132,13 +136,27 @@ func newTraceCmd() *cobra.Command {
Short: "List traces",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/trace/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath(traceListPath))
}, yamlPrinter, enableTLS, insecure, cert)
},
}
- bindFileFlag(createCmd, updateCmd)
- bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd)
- traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd)
+ queryCmd := &cobra.Command{
+ Use: "query [-s start_time] [-e end_time] -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Query data in a trace",
+ Long: timeRangeUsage,
+ RunE: func(cmd *cobra.Command, _ []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ return
request.req.SetBody(request.data).Post(getPath(traceQueryPath))
+ }, yamlPrinter, enableTLS, insecure, cert)
+ },
+ }
+
+ bindFileFlag(createCmd, updateCmd, queryCmd)
+ bindTimeRangeFlag(queryCmd)
+ bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
+ traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
return traceCmd
}
diff --git a/bydbctl/internal/cmd/trace_test.go
b/bydbctl/internal/cmd/trace_test.go
index 16934834..18b33a2a 100644
--- a/bydbctl/internal/cmd/trace_test.go
+++ b/bydbctl/internal/cmd/trace_test.go
@@ -18,18 +18,25 @@
package cmd_test
import (
+ "fmt"
"strings"
+ "time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/cobra"
"github.com/zenizh/go-capturer"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ cases_trace_data
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
)
var _ = Describe("Trace Schema Operation", func() {
@@ -184,3 +191,108 @@ timestamp_tag_name: timestamp`))
deferFunc()
})
})
+
+var _ = Describe("Trace Data Query", func() {
+ var addr, grpcAddr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ var now time.Time
+ var nowStr, endStr string
+ var interval time.Duration
+ BeforeEach(func() {
+ var err error
+ now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
+ Expect(err).NotTo(HaveOccurred())
+ nowStr = now.Format(time.RFC3339)
+ interval = 500 * time.Millisecond
+ endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
+ grpcAddr, addr, deferFunc = setup.Standalone()
+ addr = httpSchema + addr
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ })
+
+ It("query trace all data", func() {
+ conn, err := grpclib.NewClient(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+
+ cases_trace_data.Write(conn, "sw", now, interval)
+ rootCmd.SetArgs([]string{"trace", "query", "-a", addr, "-f",
"-"})
+ issue := func() string {
+ sprintf := fmt.Sprintf(`
+name: sw
+groups: ["test-trace-group"]
+timeRange:
+ begin: %s
+ end: %s
+limit: 100
+orderBy:
+ indexRuleName: "timestamp"
+ sort: SORT_DESC`, nowStr, endStr)
+ rootCmd.SetIn(strings.NewReader(sprintf))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue,
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ resp := new(tracev1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.Traces)
+ }, flags.EventuallyTimeout).Should(Equal(5))
+ })
+
+ DescribeTable("query trace data with time range flags", func(timeArgs
...string) {
+ conn, err := grpclib.NewClient(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ now := timestamp.NowMilli()
+ interval := -1 * time.Millisecond
+ cases_trace_data.Write(conn, "sw", now, interval)
+ args := []string{"trace", "query", "-a", addr}
+ args = append(args, timeArgs...)
+ args = append(args, "-f", "-")
+ rootCmd.SetArgs(args)
+ issue := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+name: sw
+groups: ["test-trace-group"]
+limit: 100
+orderBy:
+ indexRuleName: "timestamp"
+ sort: SORT_DESC`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue,
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ resp := new(tracev1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.Traces)
+ }, flags.EventuallyTimeout).Should(Equal(5))
+ },
+ Entry("relative start", "--start", "-30m"),
+ Entry("relative end", "--end", "0m"),
+ Entry("absolute start", "--start", nowStr),
+ Entry("absolute end", "--end", endStr),
+ Entry("default"),
+ Entry("all relative", "--start", "-30m", "--end", "0m"),
+ Entry("all absolute", "--start", nowStr, "--end", endStr),
+ )
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})