This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch tracetest
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit fb1713612aeda236c36bd68f72c24128ad70bb1c
Author: kezhenxu94 <[email protected]>
AuthorDate: Mon Sep 15 15:33:28 2025 +0800

    tracetest
---
 bydbctl/internal/cmd/trace.go      |  32 ++++++-
 bydbctl/internal/cmd/trace_test.go | 190 +++++++++++++++++++++++++++++++++++++
 test/cases/trace/data/data.go      |  10 ++
 3 files changed, 229 insertions(+), 3 deletions(-)

diff --git a/bydbctl/internal/cmd/trace.go b/bydbctl/internal/cmd/trace.go
index 3318a424..8d886b10 100644
--- a/bydbctl/internal/cmd/trace.go
+++ b/bydbctl/internal/cmd/trace.go
@@ -25,6 +25,7 @@ import (
        "google.golang.org/protobuf/encoding/protojson"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
@@ -137,8 +138,33 @@ func newTraceCmd() *cobra.Command {
                },
        }
 
-       bindFileFlag(createCmd, updateCmd)
-       bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd)
-       traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd)
+       queryCmd := &cobra.Command{
+               Use:     "query [-g group] -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Query data in a trace",
+               Long:    timeRangeUsage,
+               RunE: func(cmd *cobra.Command, _ []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       queryReq := new(tracev1.QueryRequest)
+                                       err := 
protojson.Unmarshal(request.data, queryReq)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+
+                                       b, err := protojson.Marshal(queryReq)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+
+                                       return 
request.req.SetBody(b).Post(getPath("/api/v1/trace/data"))
+                               }, yamlPrinter, enableTLS, insecure, cert)
+               },
+       }
+
+       bindFileFlag(createCmd, updateCmd, queryCmd)
+       bindTimeRangeFlag(queryCmd)
+       bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
+       traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
        return traceCmd
 }
diff --git a/bydbctl/internal/cmd/trace_test.go 
b/bydbctl/internal/cmd/trace_test.go
index 16934834..e82f2a03 100644
--- a/bydbctl/internal/cmd/trace_test.go
+++ b/bydbctl/internal/cmd/trace_test.go
@@ -18,24 +18,34 @@
 package cmd_test
 
 import (
+       "context"
+       "fmt"
        "strings"
+       "time"
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
        "github.com/spf13/cobra"
        "github.com/zenizh/go-capturer"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/timestamppb"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       cases_trace_data 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
 )
 
 var _ = Describe("Trace Schema Operation", func() {
        var addr string
        var deferFunc func()
        var rootCmd *cobra.Command
+
        BeforeEach(func() {
                _, addr, deferFunc = setup.EmptyStandalone()
                addr = httpSchema + addr
@@ -184,3 +194,183 @@ timestamp_tag_name: timestamp`))
                deferFunc()
        })
 })
+
+var _ = Describe("Trace Data Query", func() {
+       var addr, grpcAddr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       var now time.Time
+       var interval time.Duration
+       BeforeEach(func() {
+               var err error
+               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
+               Expect(err).NotTo(HaveOccurred())
+               interval = 500 * time.Millisecond
+               grpcAddr, addr, deferFunc = setup.EmptyStandalone()
+               addr = httpSchema + addr
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+
+               // Create trace group
+               rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", 
"-"})
+               createGroup := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: test-trace-group
+catalog: CATALOG_TRACE
+resource_opts:
+  shard_num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("group creation 
fails: %v", err)
+                               }
+                       })
+               }
+               Eventually(createGroup, 
flags.EventuallyTimeout).Should(ContainSubstring("group test-trace-group is 
created"))
+
+               // Create trace schema
+               rootCmd.SetArgs([]string{"trace", "create", "-a", addr, "-f", 
"-"})
+               createTrace := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+ metadata:
+   name: sw
+   group: test-trace-group
+ tags:
+   - name: trace_id
+     type: TAG_TYPE_STRING
+   - name: state
+     type: TAG_TYPE_INT
+   - name: service_id
+     type: TAG_TYPE_STRING
+   - name: service_instance_id
+     type: TAG_TYPE_STRING
+   - name: endpoint_id
+     type: TAG_TYPE_STRING
+   - name: duration
+     type: TAG_TYPE_INT
+   - name: timestamp
+     type: TAG_TYPE_TIMESTAMP
+ trace_id_tag_name: trace_id
+ timestamp_tag_name: timestamp`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("trace creation 
fails: %v", err)
+                               }
+                       })
+               }
+               Eventually(createTrace, 
flags.EventuallyTimeout).Should(ContainSubstring("trace test-trace-group.sw is 
created"))
+       })
+
+       It("query trace all data", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               fmt.Printf("Writing data with base time: %s, interval: %v\n", 
now.Format(time.RFC3339Nano), interval)
+               cases_trace_data.Write(conn, "sw", now, interval)
+               
+               // Test direct GRPC query first
+               c := tracev1.NewTraceServiceClient(conn)
+               ctx := context.Background()
+               grpcQuery := &tracev1.QueryRequest{
+                       Name:   "sw",
+                       Groups: []string{"test-trace-group"},
+                       TimeRange: &modelv1.TimeRange{
+                               Begin: timestamppb.New(now.Add(-24 * 
time.Hour)),
+                               End:   timestamppb.New(now.Add(24 * time.Hour)),
+                       },
+               }
+               
+               Eventually(func() int {
+                       resp, err := c.Query(ctx, grpcQuery)
+                       if err != nil {
+                               fmt.Printf("GRPC query error: %v\n", err)
+                               return 0
+                       }
+                       fmt.Printf("GRPC query returned %d spans\n", 
len(resp.Spans))
+                       return len(resp.Spans)
+               }, flags.EventuallyTimeout).Should(Equal(5))
+               
+               // Now test CLI query
+//             rootCmd.SetArgs([]string{"trace", "query", "-a", addr, "-f", 
"-"})
+//             issue := func() string {
+//                     queryYAML := fmt.Sprintf(`
+// name: sw
+// groups: ["test-trace-group"]
+// timeRange:
+//   begin: %s
+//   end: %s`, nowStr, endStr)
+//                     fmt.Printf("Query YAML:\n%s\n", queryYAML)
+//                     rootCmd.SetIn(strings.NewReader(queryYAML))
+//                     return capturer.CaptureStdout(func() {
+//                             err := rootCmd.Execute()
+//                             Expect(err).NotTo(HaveOccurred())
+//                     })
+//             }
+//             Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+//             Eventually(func() int {
+//                     out := issue()
+//                     GinkgoWriter.Println("Query output:", out)
+//                     resp := new(tracev1.QueryResponse)
+//                     helpers.UnmarshalYAML([]byte(out), resp)
+//                     return len(resp.Spans)
+//             }, flags.EventuallyTimeout).Should(Equal(5))
+       })
+
+//     DescribeTable("query trace data with time range flags", func(timeArgs 
...string) {
+//             conn, err := grpclib.NewClient(
+//                     grpcAddr,
+//                     
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+//             )
+//             Expect(err).NotTo(HaveOccurred())
+//             now := timestamp.NowMilli()
+//             interval := -1 * time.Millisecond
+//             cases_trace_data.Write(conn, "sw", now, interval)
+//             args := []string{"trace", "query", "-a", addr}
+//             args = append(args, timeArgs...)
+//             args = append(args, "-f", "-")
+//             rootCmd.SetArgs(args)
+//             issue := func() string {
+//                     rootCmd.SetIn(strings.NewReader(`
+// name: sw
+// groups: ["test-trace-group"]
+// tagProjection:
+//   - trace_id
+//   - timestamp`))
+//                     return capturer.CaptureStdout(func() {
+//                             err := rootCmd.Execute()
+//                             Expect(err).NotTo(HaveOccurred())
+//                     })
+//             }
+//             Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+//             Eventually(func() int {
+//                     out := issue()
+//                     resp := new(tracev1.QueryResponse)
+//                     helpers.UnmarshalYAML([]byte(out), resp)
+//                     GinkgoWriter.Println(resp)
+//                     return len(resp.Spans)
+//             }, flags.EventuallyTimeout).Should(Equal(5))
+//     },
+//             Entry("relative start", "--start", "-30m"),
+//             Entry("relative end", "--end", "0m"),
+//             Entry("absolute start", "--start", nowStr),
+//             Entry("absolute end", "--end", endStr),
+//             Entry("default"),
+//             Entry("all relative", "--start", "-30m", "--end", "0m"),
+//             Entry("all absolute", "--start", nowStr, "--end", endStr),
+//     )
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go
index e35751eb..d9eda2fa 100644
--- a/test/cases/trace/data/data.go
+++ b/test/cases/trace/data/data.go
@@ -167,6 +167,8 @@ func loadData(stream tracev1.TraceService_WriteClient, 
metadata *commonv1.Metada
                }
                tagValues = append(tagValues, timestampTag)
 
+               fmt.Printf("Inserting span with data: %s, timestamp: %s\n", 
spanData, timestamp.Format(time.RFC3339Nano))
+
                errInner := stream.Send(&tracev1.WriteRequest{
                        Metadata: metadata,
                        Tags:     tagValues,
@@ -191,20 +193,28 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group, 
fileName string, baseTi
        schema := databasev1.NewTraceRegistryServiceClient(conn)
        resp, err := schema.Get(context.Background(), 
&databasev1.TraceRegistryServiceGetRequest{Metadata: metadata})
        if err != nil {
+               fmt.Printf("Failed to get trace schema %s.%s: %v\n", group, 
name, err)
+               gm.Expect(err).NotTo(gm.HaveOccurred())
                return
        }
        metadata = resp.GetTrace().GetMetadata()
+       fmt.Printf("Successfully retrieved trace schema %s.%s\n", group, name)
 
        c := tracev1.NewTraceServiceClient(conn)
        ctx := context.Background()
        writeClient, err := c.Write(ctx)
        gm.Expect(err).NotTo(gm.HaveOccurred())
+       
+       fmt.Printf("Starting to load data from %s.json\n", fileName)
        loadData(writeClient, metadata, fmt.Sprintf("%s.json", fileName), 
baseTime, interval)
+       
+       fmt.Printf("Closing write stream\n")
        gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
        gm.Eventually(func() error {
                _, err := writeClient.Recv()
                return err
        }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+       fmt.Printf("Write completed successfully\n")
 }
 
 // unmarshalYAMLWithSpanEncoding decodes YAML with special handling for span 
data.

Reply via email to