This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch perf/trace-vs-stream
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 21694f53d0db716c08a0f01999c8bc3ceebdf9b1
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Sep 8 04:52:31 2025 +0000

    refactor: update benchmark and trace client implementations
    
    - Commented out the stream write benchmark in benchmark_runner.go for 
future adjustments.
    - Changed the timestamp handling in SpanData to use Timestamp instead of 
UnixMilli in data_generator.go.
    - Simplified the Write method signature in trace_client.go by removing the 
unused WriteRequest parameter.
    - Removed the dataBinary field from the trace schema definition in 
trace_schema.json to streamline the schema.
---
 test/stress/stream-vs-trace/benchmark_runner.go      | 20 ++++++++++----------
 test/stress/stream-vs-trace/data_generator.go        |  5 ++---
 .../testdata/schema/trace_schema.json                |  4 ----
 test/stress/stream-vs-trace/trace_client.go          |  2 +-
 4 files changed, 13 insertions(+), 18 deletions(-)

diff --git a/test/stress/stream-vs-trace/benchmark_runner.go 
b/test/stress/stream-vs-trace/benchmark_runner.go
index 5edc0677..638dc7df 100644
--- a/test/stress/stream-vs-trace/benchmark_runner.go
+++ b/test/stress/stream-vs-trace/benchmark_runner.go
@@ -83,13 +83,13 @@ func (r *BenchmarkRunner) RunWriteBenchmark(ctx 
context.Context) error {
        errChan := make(chan error, 2)
 
        // Stream write benchmark
-       wg.Add(1)
-       go func() {
-               defer wg.Done()
-               if err := r.runStreamWriteBenchmark(benchCtx); err != nil {
-                       errChan <- fmt.Errorf("stream write benchmark failed: 
%w", err)
-               }
-       }()
+       // wg.Add(1)
+       // go func() {
+       //      defer wg.Done()
+       //      if err := r.runStreamWriteBenchmark(benchCtx); err != nil {
+       //              errChan <- fmt.Errorf("stream write benchmark failed: 
%w", err)
+       //      }
+       // }()
 
        // Trace write benchmark
        wg.Add(1)
@@ -112,9 +112,9 @@ func (r *BenchmarkRunner) RunWriteBenchmark(ctx 
context.Context) error {
        }
 
        // Generate reports
-       fmt.Println("\n=== Stream Model Write Performance ===")
-       streamReport := r.streamMetrics.GenerateReport("Stream Write", 
r.config.Scale)
-       streamReport.PrintReport()
+       // fmt.Println("\n=== Stream Model Write Performance ===")
+       // streamReport := r.streamMetrics.GenerateReport("Stream Write", 
r.config.Scale)
+       // streamReport.PrintReport()
 
        fmt.Println("\n=== Trace Model Write Performance ===")
        traceReport := r.traceMetrics.GenerateReport("Trace Write", 
r.config.Scale)
diff --git a/test/stress/stream-vs-trace/data_generator.go 
b/test/stress/stream-vs-trace/data_generator.go
index 24a95c65..09081e5d 100644
--- a/test/stress/stream-vs-trace/data_generator.go
+++ b/test/stress/stream-vs-trace/data_generator.go
@@ -710,14 +710,13 @@ func (s *SpanData) ToTraceWriteRequest() 
*tracev1.WriteRequest {
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.ServiceID}}},
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.ServiceInstanceID}}},
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.TraceID}}},
-                       {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
s.StartTime.UnixMilli()}}},
+                       {Value: &modelv1.TagValue_Timestamp{Timestamp: 
timestamppb.New(s.StartTime)}},
                        {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
s.Latency.Milliseconds()}}},
                        {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
isError}}},
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.SpanID}}},
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.ParentSpanID}}},
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.OperationName}}},
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
s.Component}}},
-                       {Value: &modelv1.TagValue_BinaryData{BinaryData: 
s.DataBinary}},
                },
                Span: s.DataBinary, // For trace model, span data goes in the 
span field
        }
@@ -755,7 +754,7 @@ func (c *StreamClient) WriteStreamData(ctx context.Context, 
spans []*SpanData) e
 
 // WriteTraceData writes span data to the trace service.
 func (c *TraceClient) WriteTraceData(ctx context.Context, spans []*SpanData) 
error {
-       stream, err := c.Write(ctx, nil)
+       stream, err := c.Write(ctx)
        if err != nil {
                return fmt.Errorf("failed to create trace write client: %w", 
err)
        }
diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json 
b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
index 2000cc53..098bda93 100644
--- a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
+++ b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
@@ -43,10 +43,6 @@
     {
       "name": "component",
       "type": "TAG_TYPE_STRING"
-    },
-    {
-      "name": "dataBinary",
-      "type": "TAG_TYPE_DATA_BINARY"
     }
   ],
   "trace_id_tag_name": "traceId",
diff --git a/test/stress/stream-vs-trace/trace_client.go 
b/test/stress/stream-vs-trace/trace_client.go
index f3bd04ef..23325c84 100644
--- a/test/stress/stream-vs-trace/trace_client.go
+++ b/test/stress/stream-vs-trace/trace_client.go
@@ -70,7 +70,7 @@ func (c *TraceClient) VerifySchema(ctx context.Context, 
group, name string) (boo
        return true, nil
 }
 
-func (c *TraceClient) Write(ctx context.Context, _ *tracev1.WriteRequest) 
(tracev1.TraceService_WriteClient, error) {
+func (c *TraceClient) Write(ctx context.Context) 
(tracev1.TraceService_WriteClient, error) {
        return c.serviceClient.Write(ctx)
 }
 

Reply via email to