This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch perf/trace-vs-stream in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit f83b6e68cbe9fa6422ed7153fbbad5b6b9b8a96f Author: Gao Hongtao <[email protected]> AuthorDate: Tue Sep 23 00:51:39 2025 +0000 refactor: enhance snapshot removal logic and streamline benchmark execution - Updated the snapshot removal function to mark parts for removal instead of continuing the loop. - Re-enabled the stream write benchmark in benchmark_runner.go for performance testing. - Cleaned up the test suite by uncommenting performance test execution logic in stream_vs_trace_suite_test.go, ensuring proper setup and comparison of Stream and Trace models. --- banyand/internal/sidx/snapshot.go | 1 + test/stress/stream-vs-trace/benchmark_runner.go | 20 +++++------ .../stream-vs-trace/stream_vs_trace_suite_test.go | 39 +++++++++++----------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/banyand/internal/sidx/snapshot.go b/banyand/internal/sidx/snapshot.go index c16d9e30..e99182d6 100644 --- a/banyand/internal/sidx/snapshot.go +++ b/banyand/internal/sidx/snapshot.go @@ -313,6 +313,7 @@ func (s *snapshot) remove(epoch uint64, toRemove map[uint64]struct{}) *snapshot result.parts = append(result.parts, pw) continue } + continue } pw.markForRemoval() } diff --git a/test/stress/stream-vs-trace/benchmark_runner.go b/test/stress/stream-vs-trace/benchmark_runner.go index 638dc7df..5edc0677 100644 --- a/test/stress/stream-vs-trace/benchmark_runner.go +++ b/test/stress/stream-vs-trace/benchmark_runner.go @@ -83,13 +83,13 @@ func (r *BenchmarkRunner) RunWriteBenchmark(ctx context.Context) error { errChan := make(chan error, 2) // Stream write benchmark - // wg.Add(1) - // go func() { - // defer wg.Done() - // if err := r.runStreamWriteBenchmark(benchCtx); err != nil { - // errChan <- fmt.Errorf("stream write benchmark failed: %w", err) - // } - // }() + wg.Add(1) + go func() { + defer wg.Done() + if err := r.runStreamWriteBenchmark(benchCtx); err != nil { + errChan <- fmt.Errorf("stream write benchmark failed: %w", err) + } + }() // Trace write benchmark wg.Add(1) @@ -112,9 +112,9 @@ func (r *BenchmarkRunner) RunWriteBenchmark(ctx context.Context) error { } // Generate reports - // fmt.Println("\n=== Stream Model Write Performance ===") - // streamReport := r.streamMetrics.GenerateReport("Stream Write", r.config.Scale) - // streamReport.PrintReport() + fmt.Println("\n=== Stream Model Write Performance ===") + streamReport := r.streamMetrics.GenerateReport("Stream Write", r.config.Scale) + streamReport.PrintReport() fmt.Println("\n=== Trace Model Write Performance ===") traceReport := r.traceMetrics.GenerateReport("Trace Write", r.config.Scale) diff --git a/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go index 4549fa5f..863c673e 100644 --- a/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go +++ b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go @@ -51,9 +51,9 @@ var _ = g.Describe("Stream vs Trace Performance", func() { }) g.It("should setup schemas and run performance comparison", func() { - path, deferFn, err := test.NewSpace() + path, _, err := test.NewSpace() gomega.Expect(err).NotTo(gomega.HaveOccurred()) - g.DeferCleanup(deferFn) + // g.DeferCleanup(deferFn) var ports []int ports, err = test.AllocateFreePorts(4) @@ -79,30 +79,12 @@ var _ = g.Describe("Stream vs Trace Performance", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) defer conn.Close() - // Run performance tests - g.By("Running Stream vs Trace performance comparison") - // Create clients streamClient := NewStreamClient(conn) traceClient := NewTraceClient(conn) - // Create context for operations ctx := context.Background() - // Create benchmark runner with small scale for testing - config := DefaultBenchmarkConfig(SmallScale) - config.TestDuration = 2 * time.Minute // Shorter duration for testing - config.Concurrency = 5 // Lower concurrency for testing - - benchmarkRunner := NewBenchmarkRunner(config, streamClient, traceClient) - - // Run write benchmark - err = benchmarkRunner.RunWriteBenchmark(ctx) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - - // Compare results - benchmarkRunner.CompareResults() - // Verify schemas are created // Test basic connectivity @@ -165,5 +147,22 @@ var _ = g.Describe("Stream vs Trace Performance", func() { gomega.Expect(traceExists).To(gomega.BeTrue()) fmt.Println("All schemas, groups, index rules, and index rule bindings verified successfully!") + + // Run performance tests + g.By("Running Stream vs Trace performance comparison") + + // Create benchmark runner with small scale for testing + config := DefaultBenchmarkConfig(SmallScale) + config.TestDuration = 2 * time.Minute // Shorter duration for testing + config.Concurrency = 5 // Lower concurrency for testing + + benchmarkRunner := NewBenchmarkRunner(config, streamClient, traceClient) + + // Run write benchmark + err = benchmarkRunner.RunWriteBenchmark(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Compare results + benchmarkRunner.CompareResults() }) })
