This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch tracetest in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit fb1713612aeda236c36bd68f72c24128ad70bb1c Author: kezhenxu94 <[email protected]> AuthorDate: Mon Sep 15 15:33:28 2025 +0800 tracetest --- bydbctl/internal/cmd/trace.go | 32 ++++++- bydbctl/internal/cmd/trace_test.go | 190 +++++++++++++++++++++++++++++++++++++ test/cases/trace/data/data.go | 10 ++ 3 files changed, 229 insertions(+), 3 deletions(-) diff --git a/bydbctl/internal/cmd/trace.go b/bydbctl/internal/cmd/trace.go index 3318a424..8d886b10 100644 --- a/bydbctl/internal/cmd/trace.go +++ b/bydbctl/internal/cmd/trace.go @@ -25,6 +25,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/pkg/version" ) @@ -137,8 +138,33 @@ func newTraceCmd() *cobra.Command { }, } - bindFileFlag(createCmd, updateCmd) - bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd) - traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd) + queryCmd := &cobra.Command{ + Use: "query [-g group] -f [file|dir|-]", + Version: version.Build(), + Short: "Query data in a trace", + Long: timeRangeUsage, + RunE: func(cmd *cobra.Command, _ []string) (err error) { + return rest(func() ([]reqBody, error) { return parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) }, + func(request request) (*resty.Response, error) { + queryReq := new(tracev1.QueryRequest) + err := protojson.Unmarshal(request.data, queryReq) + if err != nil { + return nil, err + } + + b, err := protojson.Marshal(queryReq) + if err != nil { + return nil, err + } + + return request.req.SetBody(b).Post(getPath("/api/v1/trace/data")) + }, yamlPrinter, enableTLS, insecure, cert) + }, + } + + bindFileFlag(createCmd, updateCmd, queryCmd) + bindTimeRangeFlag(queryCmd) + bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd, queryCmd) + traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, queryCmd) return traceCmd } diff --git a/bydbctl/internal/cmd/trace_test.go b/bydbctl/internal/cmd/trace_test.go index 16934834..e82f2a03 100644 --- a/bydbctl/internal/cmd/trace_test.go +++ b/bydbctl/internal/cmd/trace_test.go @@ -18,24 +18,34 @@ package cmd_test import ( + "context" + "fmt" "strings" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/spf13/cobra" "github.com/zenizh/go-capturer" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/timestamppb" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" + cases_trace_data "github.com/apache/skywalking-banyandb/test/cases/trace/data" ) var _ = Describe("Trace Schema Operation", func() { var addr string var deferFunc func() var rootCmd *cobra.Command + BeforeEach(func() { _, addr, deferFunc = setup.EmptyStandalone() addr = httpSchema + addr @@ -184,3 +194,183 @@ timestamp_tag_name: timestamp`)) deferFunc() }) }) + +var _ = Describe("Trace Data Query", func() { + var addr, grpcAddr string + var deferFunc func() + var rootCmd *cobra.Command + var now time.Time + var interval time.Duration + BeforeEach(func() { + var err error + now, err = time.ParseInLocation("2006-01-02T15:04:05", "2021-09-01T23:30:00", time.Local) + Expect(err).NotTo(HaveOccurred()) + interval = 500 * time.Millisecond + grpcAddr, addr, deferFunc = setup.EmptyStandalone() + addr = httpSchema + addr + rootCmd = &cobra.Command{Use: "root"} + cmd.RootCmdFlags(rootCmd) + + // Create trace group + rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"}) + createGroup := func() string { + rootCmd.SetIn(strings.NewReader(` +metadata: + name: test-trace-group +catalog: CATALOG_TRACE +resource_opts: + shard_num: 2 + segment_interval: + unit: UNIT_DAY + num: 1 + ttl: + unit: UNIT_DAY + num: 7`)) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("group creation fails: %v", err) + } + }) + } + Eventually(createGroup, flags.EventuallyTimeout).Should(ContainSubstring("group test-trace-group is created")) + + // Create trace schema + rootCmd.SetArgs([]string{"trace", "create", "-a", addr, "-f", "-"}) + createTrace := func() string { + rootCmd.SetIn(strings.NewReader(` + metadata: + name: sw + group: test-trace-group + tags: + - name: trace_id + type: TAG_TYPE_STRING + - name: state + type: TAG_TYPE_INT + - name: service_id + type: TAG_TYPE_STRING + - name: service_instance_id + type: TAG_TYPE_STRING + - name: endpoint_id + type: TAG_TYPE_STRING + - name: duration + type: TAG_TYPE_INT + - name: timestamp + type: TAG_TYPE_TIMESTAMP + trace_id_tag_name: trace_id + timestamp_tag_name: timestamp`)) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("trace creation fails: %v", err) + } + }) + } + Eventually(createTrace, flags.EventuallyTimeout).Should(ContainSubstring("trace test-trace-group.sw is created")) + }) + + It("query trace all data", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + fmt.Printf("Writing data with base time: %s, interval: %v\n", now.Format(time.RFC3339Nano), interval) + cases_trace_data.Write(conn, "sw", now, interval) + + // Test direct GRPC query first + c := tracev1.NewTraceServiceClient(conn) + ctx := context.Background() + grpcQuery := &tracev1.QueryRequest{ + Name: "sw", + Groups: []string{"test-trace-group"}, + TimeRange: &modelv1.TimeRange{ + Begin: timestamppb.New(now.Add(-24 * time.Hour)), + End: timestamppb.New(now.Add(24 * time.Hour)), + }, + } + + Eventually(func() int { + resp, err := c.Query(ctx, grpcQuery) + if err != nil { + fmt.Printf("GRPC query error: %v\n", err) + return 0 + } + fmt.Printf("GRPC query returned %d spans\n", len(resp.Spans)) + return len(resp.Spans) + }, flags.EventuallyTimeout).Should(Equal(5)) + + // Now test CLI query +// rootCmd.SetArgs([]string{"trace", "query", "-a", addr, "-f", "-"}) +// issue := func() string { +// queryYAML := fmt.Sprintf(` +// name: sw +// groups: ["test-trace-group"] +// timeRange: +// begin: %s +// end: %s`, nowStr, endStr) +// fmt.Printf("Query YAML:\n%s\n", queryYAML) +// rootCmd.SetIn(strings.NewReader(queryYAML)) +// return capturer.CaptureStdout(func() { +// err := rootCmd.Execute() +// Expect(err).NotTo(HaveOccurred()) +// }) +// } +// Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) +// Eventually(func() int { +// out := issue() +// GinkgoWriter.Println("Query output:", out) +// resp := new(tracev1.QueryResponse) +// helpers.UnmarshalYAML([]byte(out), resp) +// return len(resp.Spans) +// }, flags.EventuallyTimeout).Should(Equal(5)) + }) + +// DescribeTable("query trace data with time range flags", func(timeArgs ...string) { +// conn, err := grpclib.NewClient( +// grpcAddr, +// grpclib.WithTransportCredentials(insecure.NewCredentials()), +// ) +// Expect(err).NotTo(HaveOccurred()) +// now := timestamp.NowMilli() +// interval := -1 * time.Millisecond +// cases_trace_data.Write(conn, "sw", now, interval) +// args := []string{"trace", "query", "-a", addr} +// args = append(args, timeArgs...) +// args = append(args, "-f", "-") +// rootCmd.SetArgs(args) +// issue := func() string { +// rootCmd.SetIn(strings.NewReader(` +// name: sw +// groups: ["test-trace-group"] +// tagProjection: +// - trace_id +// - timestamp`)) +// return capturer.CaptureStdout(func() { +// err := rootCmd.Execute() +// Expect(err).NotTo(HaveOccurred()) +// }) +// } +// Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) +// Eventually(func() int { +// out := issue() +// resp := new(tracev1.QueryResponse) +// helpers.UnmarshalYAML([]byte(out), resp) +// GinkgoWriter.Println(resp) +// return len(resp.Spans) +// }, flags.EventuallyTimeout).Should(Equal(5)) +// }, +// Entry("relative start", "--start", "-30m"), +// Entry("relative end", "--end", "0m"), +// Entry("absolute start", "--start", nowStr), +// Entry("absolute end", "--end", endStr), +// Entry("default"), +// Entry("all relative", "--start", "-30m", "--end", "0m"), +// Entry("all absolute", "--start", nowStr, "--end", endStr), +// ) + + AfterEach(func() { + deferFunc() + }) +}) diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go index e35751eb..d9eda2fa 100644 --- a/test/cases/trace/data/data.go +++ b/test/cases/trace/data/data.go @@ -167,6 +167,8 @@ func loadData(stream tracev1.TraceService_WriteClient, metadata *commonv1.Metada } tagValues = append(tagValues, timestampTag) + fmt.Printf("Inserting span with data: %s, timestamp: %s\n", spanData, timestamp.Format(time.RFC3339Nano)) + errInner := stream.Send(&tracev1.WriteRequest{ Metadata: metadata, Tags: tagValues, @@ -191,20 +193,28 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group, fileName string, baseTi schema := databasev1.NewTraceRegistryServiceClient(conn) resp, err := schema.Get(context.Background(), &databasev1.TraceRegistryServiceGetRequest{Metadata: metadata}) if err != nil { + fmt.Printf("Failed to get trace schema %s.%s: %v\n", group, name, err) + gm.Expect(err).NotTo(gm.HaveOccurred()) return } metadata = resp.GetTrace().GetMetadata() + fmt.Printf("Successfully retrieved trace schema %s.%s\n", group, name) c := tracev1.NewTraceServiceClient(conn) ctx := context.Background() writeClient, err := c.Write(ctx) gm.Expect(err).NotTo(gm.HaveOccurred()) + + fmt.Printf("Starting to load data from %s.json\n", fileName) loadData(writeClient, metadata, fmt.Sprintf("%s.json", fileName), baseTime, interval) + + fmt.Printf("Closing write stream\n") gm.Expect(writeClient.CloseSend()).To(gm.Succeed()) gm.Eventually(func() error { _, err := writeClient.Recv() return err }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF)) + fmt.Printf("Write completed successfully\n") } // unmarshalYAMLWithSpanEncoding decodes YAML with special handling for span data.
