This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch perf/trace-vs-stream in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 1848cca74614e15cd646fcaed4f08b88ba09b5eb Author: Gao Hongtao <[email protected]> AuthorDate: Fri Sep 5 12:35:55 2025 +0000 feat: add performance comparison test for Stream and Trace models - Introduced a new directory for performance tests comparing BanyanDB's Stream and Trace models. - Added schema definitions, including group, stream, and trace schemas, along with index rules and bindings. - Implemented schema loading and verification logic for both models. - Created clients for interacting with Stream and Trace services. - Developed a test suite using Ginkgo to validate schema setup and prepare for performance benchmarking. --- test/stress/stream-vs-trace/README.md | 76 +++++ test/stress/stream-vs-trace/schema_loader.go | 308 +++++++++++++++++++++ test/stress/stream-vs-trace/stream_client.go | 137 +++++++++ .../stream-vs-trace/stream_vs_trace_suite_test.go | 159 +++++++++++ .../stream-vs-trace/testdata/schema/README.md | 52 ++++ .../stream-vs-trace/testdata/schema/group.json | 36 +++ .../schema/stream_index_rule_bindings.json | 23 ++ .../testdata/schema/stream_index_rules.json | 58 ++++ .../testdata/schema/stream_schema.json | 65 +++++ .../testdata/schema/trace_index_rule_bindings.json | 18 ++ .../testdata/schema/trace_index_rules.json | 18 ++ .../testdata/schema/trace_schema.json | 54 ++++ test/stress/stream-vs-trace/trace_client.go | 137 +++++++++ 13 files changed, 1141 insertions(+) diff --git a/test/stress/stream-vs-trace/README.md b/test/stress/stream-vs-trace/README.md new file mode 100644 index 00000000..f76c071b --- /dev/null +++ b/test/stress/stream-vs-trace/README.md @@ -0,0 +1,76 @@ +# Stream vs Trace Performance Test + +This directory contains the performance comparison test between BanyanDB's Stream and Trace models for storing SkyWalking segment data (spans). + +## Overview + +The test compares the performance characteristics of two different data models: +- **Stream Model**: Entity-based structure with individual tag indexes +- **Trace Model**: Trace-specific structure with composite indexes + +## Test Structure + +### Schema Files +- `testdata/schema/` - Contains all schema definitions (groups, schemas, index rules, bindings) +- `schema_loader.go` - Loads schemas into BanyanDB during test setup +- `stream_vs_trace_suite_test.go` - Main test suite using Ginkgo +- `stream_client.go` - Stream model client for operations +- `trace_client.go` - Trace model client for operations + +### Test Setup + +The test uses `setup.ClosableStandaloneWithSchemaLoaders` to: +1. Start a standalone BanyanDB instance +2. Load both stream and trace schemas +3. Create necessary groups, index rules, and bindings +4. Verify schema creation + +### Schema Differences + +#### Stream Model (`stream_performance_test` group) +- **Entity**: `serviceId + serviceInstanceId` +- **Catalog**: `CATALOG_STREAM` +- **Indexes**: Individual indexes for each tag (except entity tags and dataBinary) +- **Data Binary**: Stored in separate tag family + +#### Trace Model (`trace_performance_test` group) +- **Catalog**: `CATALOG_TRACE` +- **Indexes**: Two composite indexes: + - `serviceId + serviceInstanceId + startTime` + - `serviceId + serviceInstanceId + latency` +- **Data Binary**: Stored as regular tag + +## Running the Test + +```bash +# Run the performance test +cd test/stress/stream-vs-trace +go test -v -timeout 30m + +# Run with specific Ginkgo labels +go test -v -timeout 30m --ginkgo.label-filter="performance" +``` + +## Test Phases + +1. **Schema Setup**: Creates groups, schemas, and index rules +2. **Schema Verification**: Verifies both models are properly created +3. **Performance Testing**: (TODO) Implements actual performance benchmarks +4. **Cleanup**: Cleans up test resources + +## Expected Results + +The test will verify that: +- Both stream and trace schemas are created successfully +- All index rules and bindings are applied correctly +- Both models are ready for performance testing + +## Next Steps + +This is the foundation for the full performance comparison test. The actual performance benchmarks (write throughput, query latency, storage efficiency) will be implemented in subsequent phases. + +## Dependencies + +- Ginkgo v2 for test framework +- BanyanDB test setup utilities +- gRPC clients for stream and trace operations diff --git a/test/stress/stream-vs-trace/schema_loader.go b/test/stress/stream-vs-trace/schema_loader.go new file mode 100644 index 00000000..7578e668 --- /dev/null +++ b/test/stress/stream-vs-trace/schema_loader.go @@ -0,0 +1,308 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package streamvstrace + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// SchemaLoader defines the interface for schema loading operations. +type SchemaLoader interface { + Name() string + PreRun(ctx context.Context) error + SetRegistry(registry schema.Registry) +} + +type schemaLoader struct { + registry schema.Registry + name string +} + +func (s *schemaLoader) Name() string { + return "schema-loader-" + s.name +} + +func (s *schemaLoader) PreRun(ctx context.Context) error { + e := s.registry + + // Load groups + if err := s.loadGroups(ctx, e); err != nil { + return errors.WithStack(err) + } + + // Load stream schemas + if err := s.loadStreamSchemas(ctx, e); err != nil { + return errors.WithStack(err) + } + + // Load trace schemas + if err := s.loadTraceSchemas(ctx, e); err != nil { + return errors.WithStack(err) + } + + // Load stream index rules + if err := s.loadStreamIndexRules(ctx, e); err != nil { + return errors.WithStack(err) + } + + // Load trace index rules + if err := s.loadTraceIndexRules(ctx, e); err != nil { + return errors.WithStack(err) + } + + // Load stream index rule bindings + if err := s.loadStreamIndexRuleBindings(ctx, e); err != nil { + return errors.WithStack(err) + } + + // Load trace index rule bindings + if err := s.loadTraceIndexRuleBindings(ctx, e); err != nil { + return errors.WithStack(err) + } + + return nil +} + +func (s *schemaLoader) SetRegistry(registry schema.Registry) { + s.registry = registry +} + +func (s *schemaLoader) loadGroups(ctx context.Context, e schema.Registry) error { + groupFile := filepath.Join("testdata", "schema", "group.json") + data, err := os.ReadFile(groupFile) + if err != nil { + return errors.Wrapf(err, "failed to read group file: %s", groupFile) + } + + var groupData []json.RawMessage + if err := json.Unmarshal(data, &groupData); err != nil { + return errors.Wrapf(err, "failed to unmarshal groups array") + } + + for i, groupBytes := range groupData { + var group commonv1.Group + if err := protojson.Unmarshal(groupBytes, &group); err != nil { + return errors.Wrapf(err, "failed to unmarshal group %d", i) + } + + if err := e.CreateGroup(ctx, &group); err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Group %s already exists, skipping", group.Metadata.Name) + continue + } + return errors.Wrapf(err, "failed to create group: %s", group.Metadata.Name) + } + logger.Infof("Created group: %s", group.Metadata.Name) + } + + return nil +} + +func (s *schemaLoader) loadStreamSchemas(ctx context.Context, e schema.Registry) error { + streamFile := filepath.Join("testdata", "schema", "stream_schema.json") + data, err := os.ReadFile(streamFile) + if err != nil { + return errors.Wrapf(err, "failed to read stream schema file: %s", streamFile) + } + + var stream databasev1.Stream + if err = protojson.Unmarshal(data, &stream); err != nil { + return errors.Wrapf(err, "failed to unmarshal stream schema") + } + + _, err = e.CreateStream(ctx, &stream) + if err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Stream %s already exists, skipping", stream.Metadata.Name) + return nil + } + return errors.Wrapf(err, "failed to create stream: %s", stream.Metadata.Name) + } + + logger.Infof("Created stream: %s", stream.Metadata.Name) + return nil +} + +func (s *schemaLoader) loadTraceSchemas(ctx context.Context, e schema.Registry) error { + traceFile := filepath.Join("testdata", "schema", "trace_schema.json") + data, err := os.ReadFile(traceFile) + if err != nil { + return errors.Wrapf(err, "failed to read trace schema file: %s", traceFile) + } + + var trace databasev1.Trace + if err = protojson.Unmarshal(data, &trace); err != nil { + return errors.Wrapf(err, "failed to unmarshal trace schema") + } + + _, err = e.CreateTrace(ctx, &trace) + if err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Trace %s already exists, skipping", trace.Metadata.Name) + return nil + } + return errors.Wrapf(err, "failed to create trace: %s", trace.Metadata.Name) + } + + logger.Infof("Created trace: %s", trace.Metadata.Name) + return nil +} + +func (s *schemaLoader) loadStreamIndexRules(ctx context.Context, e schema.Registry) error { + indexFile := filepath.Join("testdata", "schema", "stream_index_rules.json") + data, err := os.ReadFile(indexFile) + if err != nil { + return errors.Wrapf(err, "failed to read stream index rules file: %s", indexFile) + } + + var indexRuleData []json.RawMessage + if err := json.Unmarshal(data, &indexRuleData); err != nil { + return errors.Wrapf(err, "failed to unmarshal stream index rules array") + } + + for i, ruleBytes := range indexRuleData { + var rule databasev1.IndexRule + if err := protojson.Unmarshal(ruleBytes, &rule); err != nil { + return errors.Wrapf(err, "failed to unmarshal stream index rule %d", i) + } + + if err := e.CreateIndexRule(ctx, &rule); err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Stream index rule %s already exists, skipping", rule.Metadata.Name) + continue + } + return errors.Wrapf(err, "failed to create stream index rule: %s", rule.Metadata.Name) + } + logger.Infof("Created stream index rule: %s", rule.Metadata.Name) + } + + return nil +} + +func (s *schemaLoader) loadTraceIndexRules(ctx context.Context, e schema.Registry) error { + indexFile := filepath.Join("testdata", "schema", "trace_index_rules.json") + data, err := os.ReadFile(indexFile) + if err != nil { + return errors.Wrapf(err, "failed to read trace index rules file: %s", indexFile) + } + + var indexRuleData []json.RawMessage + if err := json.Unmarshal(data, &indexRuleData); err != nil { + return errors.Wrapf(err, "failed to unmarshal trace index rules array") + } + + for i, ruleBytes := range indexRuleData { + var rule databasev1.IndexRule + if err := protojson.Unmarshal(ruleBytes, &rule); err != nil { + return errors.Wrapf(err, "failed to unmarshal trace index rule %d", i) + } + + if err := e.CreateIndexRule(ctx, &rule); err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Trace index rule %s already exists, skipping", rule.Metadata.Name) + continue + } + return errors.Wrapf(err, "failed to create trace index rule: %s", rule.Metadata.Name) + } + logger.Infof("Created trace index rule: %s", rule.Metadata.Name) + } + + return nil +} + +func (s *schemaLoader) loadStreamIndexRuleBindings(ctx context.Context, e schema.Registry) error { + bindingFile := filepath.Join("testdata", "schema", "stream_index_rule_bindings.json") + data, err := os.ReadFile(bindingFile) + if err != nil { + return errors.Wrapf(err, "failed to read stream index rule bindings file: %s", bindingFile) + } + + var bindingData []json.RawMessage + if err := json.Unmarshal(data, &bindingData); err != nil { + return errors.Wrapf(err, "failed to unmarshal stream index rule bindings array") + } + + for i, bindingBytes := range bindingData { + var binding databasev1.IndexRuleBinding + if err := protojson.Unmarshal(bindingBytes, &binding); err != nil { + return errors.Wrapf(err, "failed to unmarshal stream index rule binding %d", i) + } + + if err := e.CreateIndexRuleBinding(ctx, &binding); err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Stream index rule binding %s already exists, skipping", binding.Metadata.Name) + continue + } + return errors.Wrapf(err, "failed to create stream index rule binding: %s", binding.Metadata.Name) + } + logger.Infof("Created stream index rule binding: %s", binding.Metadata.Name) + } + + return nil +} + +func (s *schemaLoader) loadTraceIndexRuleBindings(ctx context.Context, e schema.Registry) error { + bindingFile := filepath.Join("testdata", "schema", "trace_index_rule_bindings.json") + data, err := os.ReadFile(bindingFile) + if err != nil { + return errors.Wrapf(err, "failed to read trace index rule bindings file: %s", bindingFile) + } + + var bindingData []json.RawMessage + if err := json.Unmarshal(data, &bindingData); err != nil { + return errors.Wrapf(err, "failed to unmarshal trace index rule bindings array") + } + + for i, bindingBytes := range bindingData { + var binding databasev1.IndexRuleBinding + if err := protojson.Unmarshal(bindingBytes, &binding); err != nil { + return errors.Wrapf(err, "failed to unmarshal trace index rule binding %d", i) + } + + if err := e.CreateIndexRuleBinding(ctx, &binding); err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Infof("Trace index rule binding %s already exists, skipping", binding.Metadata.Name) + continue + } + return errors.Wrapf(err, "failed to create trace index rule binding: %s", binding.Metadata.Name) + } + logger.Infof("Created trace index rule binding: %s", binding.Metadata.Name) + } + + return nil +} + +// NewSchemaLoader creates a new schema loader for the stream vs trace performance test. +func NewSchemaLoader(name string) SchemaLoader { + return &schemaLoader{ + name: name, + } +} diff --git a/test/stress/stream-vs-trace/stream_client.go b/test/stress/stream-vs-trace/stream_client.go new file mode 100644 index 00000000..ac3d99aa --- /dev/null +++ b/test/stress/stream-vs-trace/stream_client.go @@ -0,0 +1,137 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package streamvstrace + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" +) + +// StreamClient provides methods to interact with stream services. +type StreamClient struct { + registryClient databasev1.StreamRegistryServiceClient + serviceClient streamv1.StreamServiceClient + groupClient databasev1.GroupRegistryServiceClient + indexRuleClient databasev1.IndexRuleRegistryServiceClient + indexRuleBindingClient databasev1.IndexRuleBindingRegistryServiceClient +} + +// NewStreamClient creates a new StreamClient instance. +func NewStreamClient(conn *grpc.ClientConn) *StreamClient { + return &StreamClient{ + registryClient: databasev1.NewStreamRegistryServiceClient(conn), + serviceClient: streamv1.NewStreamServiceClient(conn), + groupClient: databasev1.NewGroupRegistryServiceClient(conn), + indexRuleClient: databasev1.NewIndexRuleRegistryServiceClient(conn), + indexRuleBindingClient: databasev1.NewIndexRuleBindingRegistryServiceClient(conn), + } +} + +// VerifySchema checks if a stream schema exists. +func (c *StreamClient) VerifySchema(ctx context.Context, group, name string) (bool, error) { + req := &databasev1.StreamRegistryServiceGetRequest{ + Metadata: &commonv1.Metadata{ + Group: group, + Name: name, + }, + } + + _, err := c.registryClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get stream schema: %w", err) + } + + return true, nil +} + +func (c *StreamClient) Write(ctx context.Context, _ *streamv1.WriteRequest) (streamv1.StreamService_WriteClient, error) { + return c.serviceClient.Write(ctx) +} + +// Query executes a query against the stream service. +func (c *StreamClient) Query(ctx context.Context, req *streamv1.QueryRequest) (*streamv1.QueryResponse, error) { + return c.serviceClient.Query(ctx, req) +} + +// VerifyGroup checks if a group exists. +func (c *StreamClient) VerifyGroup(ctx context.Context, group string) (bool, error) { + req := &databasev1.GroupRegistryServiceGetRequest{ + Group: group, + } + + _, err := c.groupClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get group: %w", err) + } + + return true, nil +} + +// VerifyIndexRule checks if an index rule exists. +func (c *StreamClient) VerifyIndexRule(ctx context.Context, group, name string) (bool, error) { + req := &databasev1.IndexRuleRegistryServiceGetRequest{ + Metadata: &commonv1.Metadata{ + Group: group, + Name: name, + }, + } + + _, err := c.indexRuleClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get index rule: %w", err) + } + + return true, nil +} + +// VerifyIndexRuleBinding checks if an index rule binding exists. +func (c *StreamClient) VerifyIndexRuleBinding(ctx context.Context, group, name string) (bool, error) { + req := &databasev1.IndexRuleBindingRegistryServiceGetRequest{ + Metadata: &commonv1.Metadata{ + Group: group, + Name: name, + }, + } + + _, err := c.indexRuleBindingClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get index rule binding: %w", err) + } + + return true, nil +} diff --git a/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go new file mode 100644 index 00000000..b3f24f68 --- /dev/null +++ b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go @@ -0,0 +1,159 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package streamvstrace provides performance comparison test between Stream and Trace models. +package streamvstrace + +import ( + "context" + "fmt" + "testing" + "time" + + g "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + "github.com/apache/skywalking-banyandb/pkg/test/setup" +) + +func TestStreamVsTrace(t *testing.T) { + gomega.RegisterFailHandler(g.Fail) + g.RunSpecs(t, "Stream vs Trace Performance Suite", g.Label("integration", "performance", "slow")) +} + +var _ = g.Describe("Stream vs Trace Performance", func() { + g.BeforeEach(func() { + gomega.Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(gomega.Succeed()) + }) + + g.It("should setup schemas and run performance comparison", func() { + path, deferFn, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + defer deferFn() + + var ports []int + ports, err = test.AllocateFreePorts(4) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Setup BanyanDB with schema loaders + addr, _, closerServerFunc := setup.ClosableStandaloneWithSchemaLoaders( + path, ports, + []setup.SchemaLoader{NewSchemaLoader("performance-test")}, + "--logging-level", "info") + + g.DeferCleanup(func() { + closerServerFunc() + helpers.PrintDiskUsage(path, 5, 0) + }) + + // Wait for server to be ready + gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), + flags.EventuallyTimeout).Should(gomega.Succeed()) + + // Create gRPC connection + conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + defer conn.Close() + + // Run performance tests + g.By("Running Stream vs Trace performance comparison") + + // TODO: Implement actual performance tests + // This is a placeholder for the performance test implementation + fmt.Println("Schema setup completed successfully!") + fmt.Println("Stream group: stream_performance_test") + fmt.Println("Trace group: trace_performance_test") + fmt.Println("Ready to run performance tests...") + + // Verify schemas are created + ctx := context.Background() + + // Test basic connectivity + streamClient := NewStreamClient(conn) + traceClient := NewTraceClient(conn) + + // Verify stream group + streamGroupExists, err := streamClient.VerifyGroup(ctx, "stream_performance_test") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(streamGroupExists).To(gomega.BeTrue()) + + // Verify trace group + traceGroupExists, err := traceClient.VerifyGroup(ctx, "trace_performance_test") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(traceGroupExists).To(gomega.BeTrue()) + + // Verify stream index rules + streamIndexRules := []string{ + "latency_index", + "trace_id_index", + "span_id_index", + "parent_span_id_index", + "operation_name_index", + "component_index", + "is_error_index", + } + var ruleExists bool + for _, ruleName := range streamIndexRules { + ruleExists, err = streamClient.VerifyIndexRule(ctx, "stream_performance_test", ruleName) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(ruleExists).To(gomega.BeTrue()) + } + + // Verify trace index rules + traceIndexRules := []string{ + "time_based_index", + "latency_based_index", + } + for _, ruleName := range traceIndexRules { + ruleExists, err = traceClient.VerifyIndexRule(ctx, "trace_performance_test", ruleName) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(ruleExists).To(gomega.BeTrue()) + } + + // Verify stream index rule binding + streamBindingExists, err := streamClient.VerifyIndexRuleBinding(ctx, "stream_performance_test", "segment_stream_binding") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(streamBindingExists).To(gomega.BeTrue()) + + // Verify trace index rule binding + traceBindingExists, err := traceClient.VerifyIndexRuleBinding(ctx, "trace_performance_test", "segment_trace_binding") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(traceBindingExists).To(gomega.BeTrue()) + + // Verify stream schema + streamExists, err := streamClient.VerifySchema(ctx, "stream_performance_test", "segment_stream") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(streamExists).To(gomega.BeTrue()) + + // Verify trace schema + traceExists, err := traceClient.VerifySchema(ctx, "trace_performance_test", "segment_trace") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(traceExists).To(gomega.BeTrue()) + + fmt.Println("All schemas, groups, index rules, and index rule bindings verified successfully!") + }) +}) diff --git a/test/stress/stream-vs-trace/testdata/schema/README.md b/test/stress/stream-vs-trace/testdata/schema/README.md new file mode 100644 index 00000000..01ad6709 --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/README.md @@ -0,0 +1,52 @@ +# Schema Files for Stream vs Trace Performance Test + +This directory contains the schema definitions for the performance comparison test between BanyanDB's Stream and Trace models. + +## Files + +### Group Definitions +- `group.json` - Defines two separate groups: + - `stream_performance_test` with CATALOG_STREAM + - `trace_performance_test` with CATALOG_TRACE + +### Stream Model (stream_performance_test group) +- `stream_schema.json` - Stream schema definition with entity-based structure +- `stream_index_rules.json` - Individual index rules for each tag (except entity tags and dataBinary) +- `stream_index_rule_bindings.json` - Binds index rules to the stream schema + +### Trace Model (trace_performance_test group) +- `trace_schema.json` - Trace schema definition with trace-specific structure +- `trace_index_rules.json` - Two composite index rules (time-based and latency-based) +- `trace_index_rule_bindings.json` - Binds index rules to the trace schema + +## Schema Differences + +### Stream Model +- Uses entity-based structure with `serviceId` and `serviceInstanceId` as entities +- `dataBinary` is stored in a separate tag family +- Individual index rules for each tag (except entity tags and dataBinary) +- No time-based index (startTime is not indexed) + +### Trace Model +- Uses trace-specific structure with `traceId` as trace ID tag and `startTime` as timestamp tag +- `dataBinary` is stored as a regular tag +- Two composite index rules: + - `serviceId + serviceInstanceId + startTime` + - `serviceId + serviceInstanceId + latency` + +## Usage + +These schema files can be used to: +1. Create the necessary schemas in BanyanDB for the performance test +2. Validate schema definitions before running tests +3. Reference the exact structure used in the performance comparison + +## Data Structure + +Both schemas are designed to store SkyWalking segment data (spans) with the following fields: +- `serviceId`, `serviceInstanceId` - Service identification +- `traceId`, `spanId`, `parentSpanId` - Trace and span identification +- `startTime`, `latency` - Temporal information +- `operationName`, `component` - Semantic information +- `isError` - Error status +- `dataBinary` - Serialized span data diff --git a/test/stress/stream-vs-trace/testdata/schema/group.json b/test/stress/stream-vs-trace/testdata/schema/group.json new file mode 100644 index 00000000..1bc37b6b --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/group.json @@ -0,0 +1,36 @@ +[ + { + "metadata": { + "name": "stream_performance_test" + }, + "catalog": "CATALOG_STREAM", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + } + }, + { + "metadata": { + "name": "trace_performance_test" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + } + } +] diff --git a/test/stress/stream-vs-trace/testdata/schema/stream_index_rule_bindings.json b/test/stress/stream-vs-trace/testdata/schema/stream_index_rule_bindings.json new file mode 100644 index 00000000..cf0ae432 --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/stream_index_rule_bindings.json @@ -0,0 +1,23 @@ +[ + { + "metadata": { + "name": "segment_stream_binding", + "group": "stream_performance_test" + }, + "rules": [ + "latency_index", + "trace_id_index", + "span_id_index", + "parent_span_id_index", + "operation_name_index", + "component_index", + "is_error_index" + ], + "subject": { + "name": "segment_stream", + "catalog": "CATALOG_STREAM" + }, + "begin_at": "2024-01-01T00:00:00Z", + "expire_at": "2030-01-01T00:00:00Z" + } +] diff --git a/test/stress/stream-vs-trace/testdata/schema/stream_index_rules.json b/test/stress/stream-vs-trace/testdata/schema/stream_index_rules.json new file mode 100644 index 00000000..fcecafd1 --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/stream_index_rules.json @@ -0,0 +1,58 @@ +[ + { + "metadata": { + "name": "latency_index", + "group": "stream_performance_test" + }, + "tags": ["latency"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "trace_id_index", + "group": "stream_performance_test" + }, + "tags": ["traceId"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "span_id_index", + "group": "stream_performance_test" + }, + "tags": ["spanId"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "parent_span_id_index", + "group": "stream_performance_test" + }, + "tags": ["parentSpanId"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "operation_name_index", + "group": "stream_performance_test" + }, + "tags": ["operationName"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "component_index", + "group": "stream_performance_test" + }, + "tags": ["component"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "is_error_index", + "group": "stream_performance_test" + }, + "tags": ["isError"], + "type": "TYPE_INVERTED" + } +] diff --git a/test/stress/stream-vs-trace/testdata/schema/stream_schema.json b/test/stress/stream-vs-trace/testdata/schema/stream_schema.json new file mode 100644 index 00000000..cbc323e3 --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/stream_schema.json @@ -0,0 +1,65 @@ +{ + "metadata": { + "name": "segment_stream", + "group": "stream_performance_test" + }, + "entity": { + "tag_names": ["serviceId", "serviceInstanceId"] + }, + "tag_families": [ + { + "name": "primary", + "tags": [ + { + "name": "serviceId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "serviceInstanceId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "traceId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "startTime", + "type": "TAG_TYPE_INT" + }, + { + "name": "latency", + "type": "TAG_TYPE_INT" + }, + { + "name": "isError", + "type": "TAG_TYPE_INT" + }, + { + "name": "spanId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "parentSpanId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "operationName", + "type": "TAG_TYPE_STRING" + }, + { + "name": "component", + "type": "TAG_TYPE_STRING" + } + ] + }, + { + "name": "data_binary", + "tags": [ + { + "name": "dataBinary", + "type": "TAG_TYPE_DATA_BINARY" + } + ] + } + ] +} diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_index_rule_bindings.json b/test/stress/stream-vs-trace/testdata/schema/trace_index_rule_bindings.json new file mode 100644 index 00000000..f1f3da46 --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/trace_index_rule_bindings.json @@ -0,0 +1,18 @@ +[ + { + "metadata": { + "name": "segment_trace_binding", + "group": "trace_performance_test" + }, + "rules": [ + "time_based_index", + "latency_based_index" + ], + "subject": { + "name": "segment_trace", + "catalog": "CATALOG_TRACE" + }, + "begin_at": "2024-01-01T00:00:00Z", + "expire_at": "2030-01-01T00:00:00Z" + } +] diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_index_rules.json b/test/stress/stream-vs-trace/testdata/schema/trace_index_rules.json new file mode 100644 index 00000000..106d73eb --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/trace_index_rules.json @@ -0,0 +1,18 @@ +[ + { + "metadata": { + "name": "time_based_index", + "group": "trace_performance_test" + }, + "tags": ["serviceId", "serviceInstanceId", "startTime"], + "type": "TYPE_INVERTED" + }, + { + "metadata": { + "name": "latency_based_index", + "group": "trace_performance_test" + }, + "tags": ["serviceId", "serviceInstanceId", "latency"], + "type": "TYPE_INVERTED" + } +] diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json new file mode 100644 index 00000000..2000cc53 --- /dev/null +++ b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json @@ -0,0 +1,54 @@ +{ + "metadata": { + "name": "segment_trace", + "group": "trace_performance_test" + }, + "tags": [ + { + "name": "serviceId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "serviceInstanceId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "traceId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "startTime", + "type": "TAG_TYPE_INT" + }, + { + "name": "latency", + "type": "TAG_TYPE_INT" + }, + { + "name": "isError", + "type": "TAG_TYPE_INT" + }, + { + "name": "spanId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "parentSpanId", + "type": "TAG_TYPE_STRING" + }, + { + "name": "operationName", + "type": "TAG_TYPE_STRING" + }, + { + "name": "component", + "type": "TAG_TYPE_STRING" + }, + { + "name": "dataBinary", + "type": "TAG_TYPE_DATA_BINARY" + } + ], + "trace_id_tag_name": "traceId", + "timestamp_tag_name": "startTime" +} diff --git a/test/stress/stream-vs-trace/trace_client.go b/test/stress/stream-vs-trace/trace_client.go new file mode 100644 index 00000000..f3bd04ef --- /dev/null +++ b/test/stress/stream-vs-trace/trace_client.go @@ -0,0 +1,137 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package streamvstrace + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" +) + +// TraceClient provides methods to interact with trace services. +type TraceClient struct { + registryClient databasev1.TraceRegistryServiceClient + serviceClient tracev1.TraceServiceClient + groupClient databasev1.GroupRegistryServiceClient + indexRuleClient databasev1.IndexRuleRegistryServiceClient + indexRuleBindingClient databasev1.IndexRuleBindingRegistryServiceClient +} + +// NewTraceClient creates a new TraceClient instance. +func NewTraceClient(conn *grpc.ClientConn) *TraceClient { + return &TraceClient{ + registryClient: databasev1.NewTraceRegistryServiceClient(conn), + serviceClient: tracev1.NewTraceServiceClient(conn), + groupClient: databasev1.NewGroupRegistryServiceClient(conn), + indexRuleClient: databasev1.NewIndexRuleRegistryServiceClient(conn), + indexRuleBindingClient: databasev1.NewIndexRuleBindingRegistryServiceClient(conn), + } +} + +// VerifySchema checks if a trace schema exists. +func (c *TraceClient) VerifySchema(ctx context.Context, group, name string) (bool, error) { + req := &databasev1.TraceRegistryServiceGetRequest{ + Metadata: &commonv1.Metadata{ + Group: group, + Name: name, + }, + } + + _, err := c.registryClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get trace schema: %w", err) + } + + return true, nil +} + +func (c *TraceClient) Write(ctx context.Context, _ *tracev1.WriteRequest) (tracev1.TraceService_WriteClient, error) { + return c.serviceClient.Write(ctx) +} + +// Query executes a query against the trace service. +func (c *TraceClient) Query(ctx context.Context, req *tracev1.QueryRequest) (*tracev1.QueryResponse, error) { + return c.serviceClient.Query(ctx, req) +} + +// VerifyGroup checks if a group exists. +func (c *TraceClient) VerifyGroup(ctx context.Context, group string) (bool, error) { + req := &databasev1.GroupRegistryServiceGetRequest{ + Group: group, + } + + _, err := c.groupClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get group: %w", err) + } + + return true, nil +} + +// VerifyIndexRule checks if an index rule exists. +func (c *TraceClient) VerifyIndexRule(ctx context.Context, group, name string) (bool, error) { + req := &databasev1.IndexRuleRegistryServiceGetRequest{ + Metadata: &commonv1.Metadata{ + Group: group, + Name: name, + }, + } + + _, err := c.indexRuleClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get index rule: %w", err) + } + + return true, nil +} + +// VerifyIndexRuleBinding checks if an index rule binding exists. +func (c *TraceClient) VerifyIndexRuleBinding(ctx context.Context, group, name string) (bool, error) { + req := &databasev1.IndexRuleBindingRegistryServiceGetRequest{ + Metadata: &commonv1.Metadata{ + Group: group, + Name: name, + }, + } + + _, err := c.indexRuleBindingClient.Get(ctx, req) + if err != nil { + if status.Code(err) == codes.NotFound { + return false, nil + } + return false, fmt.Errorf("failed to get index rule binding: %w", err) + } + + return true, nil +}
