This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch perf/trace-vs-stream in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 21694f53d0db716c08a0f01999c8bc3ceebdf9b1 Author: Gao Hongtao <[email protected]> AuthorDate: Mon Sep 8 04:52:31 2025 +0000 refactor: update benchmark and trace client implementations - Commented out the stream write benchmark in benchmark_runner.go for future adjustments. - Changed the timestamp handling in SpanData to use Timestamp instead of UnixMilli in data_generator.go. - Simplified the Write method signature in trace_client.go by removing the unused WriteRequest parameter. - Removed the dataBinary field from the trace schema definition in trace_schema.json to streamline the schema. --- test/stress/stream-vs-trace/benchmark_runner.go | 20 ++++++++++---------- test/stress/stream-vs-trace/data_generator.go | 5 ++--- .../testdata/schema/trace_schema.json | 4 ---- test/stress/stream-vs-trace/trace_client.go | 2 +- 4 files changed, 13 insertions(+), 18 deletions(-) diff --git a/test/stress/stream-vs-trace/benchmark_runner.go b/test/stress/stream-vs-trace/benchmark_runner.go index 5edc0677..638dc7df 100644 --- a/test/stress/stream-vs-trace/benchmark_runner.go +++ b/test/stress/stream-vs-trace/benchmark_runner.go @@ -83,13 +83,13 @@ func (r *BenchmarkRunner) RunWriteBenchmark(ctx context.Context) error { errChan := make(chan error, 2) // Stream write benchmark - wg.Add(1) - go func() { - defer wg.Done() - if err := r.runStreamWriteBenchmark(benchCtx); err != nil { - errChan <- fmt.Errorf("stream write benchmark failed: %w", err) - } - }() + // wg.Add(1) + // go func() { + // defer wg.Done() + // if err := r.runStreamWriteBenchmark(benchCtx); err != nil { + // errChan <- fmt.Errorf("stream write benchmark failed: %w", err) + // } + // }() // Trace write benchmark wg.Add(1) @@ -112,9 +112,9 @@ func (r *BenchmarkRunner) RunWriteBenchmark(ctx context.Context) error { } // Generate reports - fmt.Println("\n=== Stream Model Write Performance ===") - streamReport := r.streamMetrics.GenerateReport("Stream Write", r.config.Scale) - streamReport.PrintReport() + // fmt.Println("\n=== Stream Model Write Performance ===") + // streamReport := r.streamMetrics.GenerateReport("Stream Write", r.config.Scale) + // streamReport.PrintReport() fmt.Println("\n=== Trace Model Write Performance ===") traceReport := r.traceMetrics.GenerateReport("Trace Write", r.config.Scale) diff --git a/test/stress/stream-vs-trace/data_generator.go b/test/stress/stream-vs-trace/data_generator.go index 24a95c65..09081e5d 100644 --- a/test/stress/stream-vs-trace/data_generator.go +++ b/test/stress/stream-vs-trace/data_generator.go @@ -710,14 +710,13 @@ func (s *SpanData) ToTraceWriteRequest() *tracev1.WriteRequest { {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.ServiceID}}}, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.ServiceInstanceID}}}, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.TraceID}}}, - {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: s.StartTime.UnixMilli()}}}, + {Value: &modelv1.TagValue_Timestamp{Timestamp: timestamppb.New(s.StartTime)}}, {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: s.Latency.Milliseconds()}}}, {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: isError}}}, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.SpanID}}}, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.ParentSpanID}}}, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.OperationName}}}, {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: s.Component}}}, - {Value: &modelv1.TagValue_BinaryData{BinaryData: s.DataBinary}}, }, Span: s.DataBinary, // For trace model, span data goes in the span field } @@ -755,7 +754,7 @@ func (c *StreamClient) WriteStreamData(ctx context.Context, spans []*SpanData) e // WriteTraceData writes span data to the trace service. func (c *TraceClient) WriteTraceData(ctx context.Context, spans []*SpanData) error { - stream, err := c.Write(ctx, nil) + stream, err := c.Write(ctx) if err != nil { return fmt.Errorf("failed to create trace write client: %w", err) } diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json index 2000cc53..098bda93 100644 --- a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json +++ b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json @@ -43,10 +43,6 @@ { "name": "component", "type": "TAG_TYPE_STRING" - }, - { - "name": "dataBinary", - "type": "TAG_TYPE_DATA_BINARY" } ], "trace_id_tag_name": "traceId", diff --git a/test/stress/stream-vs-trace/trace_client.go b/test/stress/stream-vs-trace/trace_client.go index f3bd04ef..23325c84 100644 --- a/test/stress/stream-vs-trace/trace_client.go +++ b/test/stress/stream-vs-trace/trace_client.go @@ -70,7 +70,7 @@ func (c *TraceClient) VerifySchema(ctx context.Context, group, name string) (boo return true, nil } -func (c *TraceClient) Write(ctx context.Context, _ *tracev1.WriteRequest) (tracev1.TraceService_WriteClient, error) { +func (c *TraceClient) Write(ctx context.Context) (tracev1.TraceService_WriteClient, error) { return c.serviceClient.Write(ctx) }
