This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e69b539 Add trace query command in bydbctl (#797)
9e69b539 is described below

commit 9e69b5397291da1e86ec181307f12acfa53e956d
Author: mrproliu <[email protected]>
AuthorDate: Thu Oct 2 16:57:25 2025 +0800

    Add trace query command in bydbctl (#797)
---
 banyand/liaison/grpc/discovery.go  |   4 +-
 bydbctl/internal/cmd/measure.go    |  10 +++-
 bydbctl/internal/cmd/property.go   |   3 +-
 bydbctl/internal/cmd/stream.go     |  10 +++-
 bydbctl/internal/cmd/topn.go       |  10 +++-
 bydbctl/internal/cmd/trace.go      |  28 ++++++++--
 bydbctl/internal/cmd/trace_test.go | 112 +++++++++++++++++++++++++++++++++++++
 7 files changed, 161 insertions(+), 16 deletions(-)

diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index 8df965ad..acb101bd 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -234,7 +234,6 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        case schema.KindTrace:
                trace := schemaMetadata.Spec.(*databasev1.Trace)
                id = getID(trace.GetMetadata())
-               e.traceMap[id] = trace
                // Pre-compute trace ID tag index
                traceIDTagName := trace.GetTraceIdTagName()
                traceIDIndex := -1
@@ -244,7 +243,10 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
                                break
                        }
                }
+               e.RWMutex.Lock()
+               e.traceMap[id] = trace
                e.traceIDIndexMap[id] = traceIDIndex
+               e.RWMutex.Unlock()
                return
        default:
                return
diff --git a/bydbctl/internal/cmd/measure.go b/bydbctl/internal/cmd/measure.go
index d2d7c32f..e8b9b799 100644
--- a/bydbctl/internal/cmd/measure.go
+++ b/bydbctl/internal/cmd/measure.go
@@ -28,7 +28,11 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-const measureSchemaPath = "/api/v1/measure/schema"
+const (
+       measureSchemaPath = "/api/v1/measure/schema"
+       measureQueryPath  = "/api/v1/measure/data"
+       measureListPath   = "/api/v1/measure/schema/lists/{group}"
+)
 
 var measureSchemaPathWithParams = measureSchemaPath + pathTemp
 
@@ -132,7 +136,7 @@ func newMeasureCmd() *cobra.Command {
                Short:   "List measures",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/measure/schema/lists/{group}"))
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath(measureListPath))
                        }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
@@ -145,7 +149,7 @@ func newMeasureCmd() *cobra.Command {
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
                        return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/data"))
+                                       return 
request.req.SetBody(request.data).Post(getPath(measureQueryPath))
                                }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
diff --git a/bydbctl/internal/cmd/property.go b/bydbctl/internal/cmd/property.go
index 7b3a1170..460e0413 100644
--- a/bydbctl/internal/cmd/property.go
+++ b/bydbctl/internal/cmd/property.go
@@ -32,6 +32,7 @@ import (
 const (
        propertySchemaPath = "/api/v1/property/schema"
        propertyDataPath   = "/api/v1/property/data/{group}/{name}/{id}"
+       propertyQueryPath  = "/api/v1/property/data/query"
 )
 
 var propertySchemaPathWithParams = propertySchemaPath + pathTemp
@@ -209,7 +210,7 @@ func newPropertyCmd() *cobra.Command {
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
                        return rest(func() ([]reqBody, error) { return 
simpleParseFromYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/property/data/query"))
+                                       return 
request.req.SetBody(request.data).Post(getPath(propertyQueryPath))
                                }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
index adad7bab..6525a23d 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/stream.go
@@ -29,7 +29,11 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-const streamSchemaPath = "/api/v1/stream/schema"
+const (
+       streamSchemaPath = "/api/v1/stream/schema"
+       streamQueryPath  = "/api/v1/stream/data"
+       streamListPath   = "/api/v1/stream/schema/lists/{group}"
+)
 
 var streamSchemaPathWithParams = streamSchemaPath + pathTemp
 
@@ -137,7 +141,7 @@ func newStreamCmd() *cobra.Command {
                Short:   "List streams",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/stream/schema/lists/{group}"))
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath(streamListPath))
                        }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
@@ -150,7 +154,7 @@ func newStreamCmd() *cobra.Command {
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
                        return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
+                                       return 
request.req.SetBody(request.data).Post(getPath(streamQueryPath))
                                }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
diff --git a/bydbctl/internal/cmd/topn.go b/bydbctl/internal/cmd/topn.go
index d7fa90a9..5c30f892 100644
--- a/bydbctl/internal/cmd/topn.go
+++ b/bydbctl/internal/cmd/topn.go
@@ -28,7 +28,11 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-const topnSchemaPath = "/api/v1/topn-agg/schema"
+const (
+       topnSchemaPath = "/api/v1/topn-agg/schema"
+       topnQueryPath  = "/api/v1/measure/topn"
+       topnListPath   = "/api/v1/topn-agg/schema/lists/{group}"
+)
 
 var topnSchemaPathWithParams = topnSchemaPath + pathTemp
 
@@ -136,7 +140,7 @@ func newTopnCmd() *cobra.Command {
                Short:   "List topn",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/topn-agg/schema/lists/{group}"))
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath(topnListPath))
                        }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
@@ -150,7 +154,7 @@ func newTopnCmd() *cobra.Command {
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
                        return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/topn"))
+                                       return 
request.req.SetBody(request.data).Post(getPath(topnQueryPath))
                                }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
diff --git a/bydbctl/internal/cmd/trace.go b/bydbctl/internal/cmd/trace.go
index 3318a424..53858f81 100644
--- a/bydbctl/internal/cmd/trace.go
+++ b/bydbctl/internal/cmd/trace.go
@@ -28,7 +28,11 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-const traceSchemaPath = "/api/v1/trace/schema"
+const (
+       traceSchemaPath = "/api/v1/trace/schema"
+       traceQueryPath  = "/api/v1/trace/data"
+       traceListPath   = "/api/v1/trace/schema/lists/{group}"
+)
 
 var traceSchemaPathWithParams = traceSchemaPath + pathTemp
 
@@ -132,13 +136,27 @@ func newTraceCmd() *cobra.Command {
                Short:   "List traces",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/trace/schema/lists/{group}"))
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath(traceListPath))
                        }, yamlPrinter, enableTLS, insecure, cert)
                },
        }
 
-       bindFileFlag(createCmd, updateCmd)
-       bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd)
-       traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd)
+       queryCmd := &cobra.Command{
+               Use:     "query [-s start_time] [-e end_time] -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Query data in a trace",
+               Long:    timeRangeUsage,
+               RunE: func(cmd *cobra.Command, _ []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       return 
request.req.SetBody(request.data).Post(getPath(traceQueryPath))
+                               }, yamlPrinter, enableTLS, insecure, cert)
+               },
+       }
+
+       bindFileFlag(createCmd, updateCmd, queryCmd)
+       bindTimeRangeFlag(queryCmd)
+       bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
+       traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
        return traceCmd
 }
diff --git a/bydbctl/internal/cmd/trace_test.go 
b/bydbctl/internal/cmd/trace_test.go
index 16934834..18b33a2a 100644
--- a/bydbctl/internal/cmd/trace_test.go
+++ b/bydbctl/internal/cmd/trace_test.go
@@ -18,18 +18,25 @@
 package cmd_test
 
 import (
+       "fmt"
        "strings"
+       "time"
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
        "github.com/spf13/cobra"
        "github.com/zenizh/go-capturer"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       cases_trace_data 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
 )
 
 var _ = Describe("Trace Schema Operation", func() {
@@ -184,3 +191,108 @@ timestamp_tag_name: timestamp`))
                deferFunc()
        })
 })
+
+var _ = Describe("Trace Data Query", func() {
+       var addr, grpcAddr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       var now time.Time
+       var nowStr, endStr string
+       var interval time.Duration
+       BeforeEach(func() {
+               var err error
+               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
+               Expect(err).NotTo(HaveOccurred())
+               nowStr = now.Format(time.RFC3339)
+               interval = 500 * time.Millisecond
+               endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
+               grpcAddr, addr, deferFunc = setup.Standalone()
+               addr = httpSchema + addr
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+       })
+
+       It("query trace all data", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               cases_trace_data.Write(conn, "sw", now, interval)
+               rootCmd.SetArgs([]string{"trace", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       sprintf := fmt.Sprintf(`
+name: sw
+groups: ["test-trace-group"]
+timeRange:
+  begin: %s
+  end: %s
+limit: 100
+orderBy:
+  indexRuleName: "timestamp"
+  sort: SORT_DESC`, nowStr, endStr)
+                       rootCmd.SetIn(strings.NewReader(sprintf))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(tracev1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Traces)
+               }, flags.EventuallyTimeout).Should(Equal(5))
+       })
+
+       DescribeTable("query trace data with time range flags", func(timeArgs 
...string) {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+               now := timestamp.NowMilli()
+               interval := -1 * time.Millisecond
+               cases_trace_data.Write(conn, "sw", now, interval)
+               args := []string{"trace", "query", "-a", addr}
+               args = append(args, timeArgs...)
+               args = append(args, "-f", "-")
+               rootCmd.SetArgs(args)
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+name: sw
+groups: ["test-trace-group"]
+limit: 100
+orderBy:
+  indexRuleName: "timestamp"
+  sort: SORT_DESC`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(tracev1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Traces)
+               }, flags.EventuallyTimeout).Should(Equal(5))
+       },
+               Entry("relative start", "--start", "-30m"),
+               Entry("relative end", "--end", "0m"),
+               Entry("absolute start", "--start", nowStr),
+               Entry("absolute end", "--end", endStr),
+               Entry("default"),
+               Entry("all relative", "--start", "-30m", "--end", "0m"),
+               Entry("all absolute", "--start", nowStr, "--end", endStr),
+       )
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})

Reply via email to