This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch perf/trace-vs-stream
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 1848cca74614e15cd646fcaed4f08b88ba09b5eb
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 5 12:35:55 2025 +0000

    feat: add performance comparison test for Stream and Trace models
    
    - Introduced a new directory for performance tests comparing BanyanDB's 
Stream and Trace models.
    - Added schema definitions, including group, stream, and trace schemas, 
along with index rules and bindings.
    - Implemented schema loading and verification logic for both models.
    - Created clients for interacting with Stream and Trace services.
    - Developed a test suite using Ginkgo to validate schema setup and prepare 
for performance benchmarking.
---
 test/stress/stream-vs-trace/README.md              |  76 +++++
 test/stress/stream-vs-trace/schema_loader.go       | 308 +++++++++++++++++++++
 test/stress/stream-vs-trace/stream_client.go       | 137 +++++++++
 .../stream-vs-trace/stream_vs_trace_suite_test.go  | 159 +++++++++++
 .../stream-vs-trace/testdata/schema/README.md      |  52 ++++
 .../stream-vs-trace/testdata/schema/group.json     |  36 +++
 .../schema/stream_index_rule_bindings.json         |  23 ++
 .../testdata/schema/stream_index_rules.json        |  58 ++++
 .../testdata/schema/stream_schema.json             |  65 +++++
 .../testdata/schema/trace_index_rule_bindings.json |  18 ++
 .../testdata/schema/trace_index_rules.json         |  18 ++
 .../testdata/schema/trace_schema.json              |  54 ++++
 test/stress/stream-vs-trace/trace_client.go        | 137 +++++++++
 13 files changed, 1141 insertions(+)

diff --git a/test/stress/stream-vs-trace/README.md 
b/test/stress/stream-vs-trace/README.md
new file mode 100644
index 00000000..f76c071b
--- /dev/null
+++ b/test/stress/stream-vs-trace/README.md
@@ -0,0 +1,76 @@
+# Stream vs Trace Performance Test
+
+This directory contains the performance comparison test between BanyanDB's 
Stream and Trace models for storing SkyWalking segment data (spans).
+
+## Overview
+
+The test compares the performance characteristics of two different data models:
+- **Stream Model**: Entity-based structure with individual tag indexes
+- **Trace Model**: Trace-specific structure with composite indexes
+
+## Test Structure
+
+### Schema Files
+- `testdata/schema/` - Contains all schema definitions (groups, schemas, index 
rules, bindings)
+- `schema_loader.go` - Loads schemas into BanyanDB during test setup
+- `stream_vs_trace_suite_test.go` - Main test suite using Ginkgo
+- `stream_client.go` - Stream model client for operations
+- `trace_client.go` - Trace model client for operations
+
+### Test Setup
+
+The test uses `setup.ClosableStandaloneWithSchemaLoaders` to:
+1. Start a standalone BanyanDB instance
+2. Load both stream and trace schemas
+3. Create necessary groups, index rules, and bindings
+4. Verify schema creation
+
+### Schema Differences
+
+#### Stream Model (`stream_performance_test` group)
+- **Entity**: `serviceId + serviceInstanceId`
+- **Catalog**: `CATALOG_STREAM`
+- **Indexes**: Individual indexes for each tag (except entity tags and 
dataBinary)
+- **Data Binary**: Stored in separate tag family
+
+#### Trace Model (`trace_performance_test` group)
+- **Catalog**: `CATALOG_TRACE`
+- **Indexes**: Two composite indexes:
+  - `serviceId + serviceInstanceId + startTime`
+  - `serviceId + serviceInstanceId + latency`
+- **Data Binary**: Stored as regular tag
+
+## Running the Test
+
+```bash
+# Run the performance test
+cd test/stress/stream-vs-trace
+go test -v -timeout 30m
+
+# Run with specific Ginkgo labels
+go test -v -timeout 30m --ginkgo.label-filter="performance"
+```
+
+## Test Phases
+
+1. **Schema Setup**: Creates groups, schemas, and index rules
+2. **Schema Verification**: Verifies both models are properly created
+3. **Performance Testing**: (TODO) Implements actual performance benchmarks
+4. **Cleanup**: Cleans up test resources
+
+## Expected Results
+
+The test will verify that:
+- Both stream and trace schemas are created successfully
+- All index rules and bindings are applied correctly
+- Both models are ready for performance testing
+
+## Next Steps
+
+This is the foundation for the full performance comparison test. The actual 
performance benchmarks (write throughput, query latency, storage efficiency) 
will be implemented in subsequent phases.
+
+## Dependencies
+
+- Ginkgo v2 for test framework
+- BanyanDB test setup utilities
+- gRPC clients for stream and trace operations
diff --git a/test/stress/stream-vs-trace/schema_loader.go 
b/test/stress/stream-vs-trace/schema_loader.go
new file mode 100644
index 00000000..7578e668
--- /dev/null
+++ b/test/stress/stream-vs-trace/schema_loader.go
@@ -0,0 +1,308 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "context"
+       "encoding/json"
+       "os"
+       "path/filepath"
+
+       "github.com/pkg/errors"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// SchemaLoader defines the interface for schema loading operations.
+type SchemaLoader interface {
+       Name() string
+       PreRun(ctx context.Context) error
+       SetRegistry(registry schema.Registry)
+}
+
+type schemaLoader struct {
+       registry schema.Registry
+       name     string
+}
+
+func (s *schemaLoader) Name() string {
+       return "schema-loader-" + s.name
+}
+
+func (s *schemaLoader) PreRun(ctx context.Context) error {
+       e := s.registry
+
+       // Load groups
+       if err := s.loadGroups(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       // Load stream schemas
+       if err := s.loadStreamSchemas(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       // Load trace schemas
+       if err := s.loadTraceSchemas(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       // Load stream index rules
+       if err := s.loadStreamIndexRules(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       // Load trace index rules
+       if err := s.loadTraceIndexRules(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       // Load stream index rule bindings
+       if err := s.loadStreamIndexRuleBindings(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       // Load trace index rule bindings
+       if err := s.loadTraceIndexRuleBindings(ctx, e); err != nil {
+               return errors.WithStack(err)
+       }
+
+       return nil
+}
+
+func (s *schemaLoader) SetRegistry(registry schema.Registry) {
+       s.registry = registry
+}
+
+func (s *schemaLoader) loadGroups(ctx context.Context, e schema.Registry) 
error {
+       groupFile := filepath.Join("testdata", "schema", "group.json")
+       data, err := os.ReadFile(groupFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read group file: %s", 
groupFile)
+       }
+
+       var groupData []json.RawMessage
+       if err := json.Unmarshal(data, &groupData); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal groups array")
+       }
+
+       for i, groupBytes := range groupData {
+               var group commonv1.Group
+               if err := protojson.Unmarshal(groupBytes, &group); err != nil {
+                       return errors.Wrapf(err, "failed to unmarshal group 
%d", i)
+               }
+
+               if err := e.CreateGroup(ctx, &group); err != nil {
+                       if status.Code(err) == codes.AlreadyExists {
+                               logger.Infof("Group %s already exists, 
skipping", group.Metadata.Name)
+                               continue
+                       }
+                       return errors.Wrapf(err, "failed to create group: %s", 
group.Metadata.Name)
+               }
+               logger.Infof("Created group: %s", group.Metadata.Name)
+       }
+
+       return nil
+}
+
+func (s *schemaLoader) loadStreamSchemas(ctx context.Context, e 
schema.Registry) error {
+       streamFile := filepath.Join("testdata", "schema", "stream_schema.json")
+       data, err := os.ReadFile(streamFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read stream schema file: 
%s", streamFile)
+       }
+
+       var stream databasev1.Stream
+       if err = protojson.Unmarshal(data, &stream); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal stream schema")
+       }
+
+       _, err = e.CreateStream(ctx, &stream)
+       if err != nil {
+               if status.Code(err) == codes.AlreadyExists {
+                       logger.Infof("Stream %s already exists, skipping", 
stream.Metadata.Name)
+                       return nil
+               }
+               return errors.Wrapf(err, "failed to create stream: %s", 
stream.Metadata.Name)
+       }
+
+       logger.Infof("Created stream: %s", stream.Metadata.Name)
+       return nil
+}
+
+func (s *schemaLoader) loadTraceSchemas(ctx context.Context, e 
schema.Registry) error {
+       traceFile := filepath.Join("testdata", "schema", "trace_schema.json")
+       data, err := os.ReadFile(traceFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read trace schema file: 
%s", traceFile)
+       }
+
+       var trace databasev1.Trace
+       if err = protojson.Unmarshal(data, &trace); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal trace schema")
+       }
+
+       _, err = e.CreateTrace(ctx, &trace)
+       if err != nil {
+               if status.Code(err) == codes.AlreadyExists {
+                       logger.Infof("Trace %s already exists, skipping", 
trace.Metadata.Name)
+                       return nil
+               }
+               return errors.Wrapf(err, "failed to create trace: %s", 
trace.Metadata.Name)
+       }
+
+       logger.Infof("Created trace: %s", trace.Metadata.Name)
+       return nil
+}
+
+func (s *schemaLoader) loadStreamIndexRules(ctx context.Context, e 
schema.Registry) error {
+       indexFile := filepath.Join("testdata", "schema", 
"stream_index_rules.json")
+       data, err := os.ReadFile(indexFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read stream index rules 
file: %s", indexFile)
+       }
+
+       var indexRuleData []json.RawMessage
+       if err := json.Unmarshal(data, &indexRuleData); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal stream index 
rules array")
+       }
+
+       for i, ruleBytes := range indexRuleData {
+               var rule databasev1.IndexRule
+               if err := protojson.Unmarshal(ruleBytes, &rule); err != nil {
+                       return errors.Wrapf(err, "failed to unmarshal stream 
index rule %d", i)
+               }
+
+               if err := e.CreateIndexRule(ctx, &rule); err != nil {
+                       if status.Code(err) == codes.AlreadyExists {
+                               logger.Infof("Stream index rule %s already 
exists, skipping", rule.Metadata.Name)
+                               continue
+                       }
+                       return errors.Wrapf(err, "failed to create stream index 
rule: %s", rule.Metadata.Name)
+               }
+               logger.Infof("Created stream index rule: %s", 
rule.Metadata.Name)
+       }
+
+       return nil
+}
+
+func (s *schemaLoader) loadTraceIndexRules(ctx context.Context, e 
schema.Registry) error {
+       indexFile := filepath.Join("testdata", "schema", 
"trace_index_rules.json")
+       data, err := os.ReadFile(indexFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read trace index rules 
file: %s", indexFile)
+       }
+
+       var indexRuleData []json.RawMessage
+       if err := json.Unmarshal(data, &indexRuleData); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal trace index rules 
array")
+       }
+
+       for i, ruleBytes := range indexRuleData {
+               var rule databasev1.IndexRule
+               if err := protojson.Unmarshal(ruleBytes, &rule); err != nil {
+                       return errors.Wrapf(err, "failed to unmarshal trace 
index rule %d", i)
+               }
+
+               if err := e.CreateIndexRule(ctx, &rule); err != nil {
+                       if status.Code(err) == codes.AlreadyExists {
+                               logger.Infof("Trace index rule %s already 
exists, skipping", rule.Metadata.Name)
+                               continue
+                       }
+                       return errors.Wrapf(err, "failed to create trace index 
rule: %s", rule.Metadata.Name)
+               }
+               logger.Infof("Created trace index rule: %s", rule.Metadata.Name)
+       }
+
+       return nil
+}
+
+func (s *schemaLoader) loadStreamIndexRuleBindings(ctx context.Context, e 
schema.Registry) error {
+       bindingFile := filepath.Join("testdata", "schema", 
"stream_index_rule_bindings.json")
+       data, err := os.ReadFile(bindingFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read stream index rule 
bindings file: %s", bindingFile)
+       }
+
+       var bindingData []json.RawMessage
+       if err := json.Unmarshal(data, &bindingData); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal stream index rule 
bindings array")
+       }
+
+       for i, bindingBytes := range bindingData {
+               var binding databasev1.IndexRuleBinding
+               if err := protojson.Unmarshal(bindingBytes, &binding); err != 
nil {
+                       return errors.Wrapf(err, "failed to unmarshal stream 
index rule binding %d", i)
+               }
+
+               if err := e.CreateIndexRuleBinding(ctx, &binding); err != nil {
+                       if status.Code(err) == codes.AlreadyExists {
+                               logger.Infof("Stream index rule binding %s 
already exists, skipping", binding.Metadata.Name)
+                               continue
+                       }
+                       return errors.Wrapf(err, "failed to create stream index 
rule binding: %s", binding.Metadata.Name)
+               }
+               logger.Infof("Created stream index rule binding: %s", 
binding.Metadata.Name)
+       }
+
+       return nil
+}
+
+func (s *schemaLoader) loadTraceIndexRuleBindings(ctx context.Context, e 
schema.Registry) error {
+       bindingFile := filepath.Join("testdata", "schema", 
"trace_index_rule_bindings.json")
+       data, err := os.ReadFile(bindingFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to read trace index rule 
bindings file: %s", bindingFile)
+       }
+
+       var bindingData []json.RawMessage
+       if err := json.Unmarshal(data, &bindingData); err != nil {
+               return errors.Wrapf(err, "failed to unmarshal trace index rule 
bindings array")
+       }
+
+       for i, bindingBytes := range bindingData {
+               var binding databasev1.IndexRuleBinding
+               if err := protojson.Unmarshal(bindingBytes, &binding); err != 
nil {
+                       return errors.Wrapf(err, "failed to unmarshal trace 
index rule binding %d", i)
+               }
+
+               if err := e.CreateIndexRuleBinding(ctx, &binding); err != nil {
+                       if status.Code(err) == codes.AlreadyExists {
+                               logger.Infof("Trace index rule binding %s 
already exists, skipping", binding.Metadata.Name)
+                               continue
+                       }
+                       return errors.Wrapf(err, "failed to create trace index 
rule binding: %s", binding.Metadata.Name)
+               }
+               logger.Infof("Created trace index rule binding: %s", 
binding.Metadata.Name)
+       }
+
+       return nil
+}
+
+// NewSchemaLoader creates a new schema loader for the stream vs trace 
performance test.
+func NewSchemaLoader(name string) SchemaLoader {
+       return &schemaLoader{
+               name: name,
+       }
+}
diff --git a/test/stress/stream-vs-trace/stream_client.go 
b/test/stress/stream-vs-trace/stream_client.go
new file mode 100644
index 00000000..ac3d99aa
--- /dev/null
+++ b/test/stress/stream-vs-trace/stream_client.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "context"
+       "fmt"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+)
+
+// StreamClient provides methods to interact with stream services.
+type StreamClient struct {
+       registryClient         databasev1.StreamRegistryServiceClient
+       serviceClient          streamv1.StreamServiceClient
+       groupClient            databasev1.GroupRegistryServiceClient
+       indexRuleClient        databasev1.IndexRuleRegistryServiceClient
+       indexRuleBindingClient databasev1.IndexRuleBindingRegistryServiceClient
+}
+
+// NewStreamClient creates a new StreamClient instance.
+func NewStreamClient(conn *grpc.ClientConn) *StreamClient {
+       return &StreamClient{
+               registryClient:         
databasev1.NewStreamRegistryServiceClient(conn),
+               serviceClient:          streamv1.NewStreamServiceClient(conn),
+               groupClient:            
databasev1.NewGroupRegistryServiceClient(conn),
+               indexRuleClient:        
databasev1.NewIndexRuleRegistryServiceClient(conn),
+               indexRuleBindingClient: 
databasev1.NewIndexRuleBindingRegistryServiceClient(conn),
+       }
+}
+
+// VerifySchema checks if a stream schema exists.
+func (c *StreamClient) VerifySchema(ctx context.Context, group, name string) 
(bool, error) {
+       req := &databasev1.StreamRegistryServiceGetRequest{
+               Metadata: &commonv1.Metadata{
+                       Group: group,
+                       Name:  name,
+               },
+       }
+
+       _, err := c.registryClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get stream schema: %w", err)
+       }
+
+       return true, nil
+}
+
+func (c *StreamClient) Write(ctx context.Context, _ *streamv1.WriteRequest) 
(streamv1.StreamService_WriteClient, error) {
+       return c.serviceClient.Write(ctx)
+}
+
+// Query executes a query against the stream service.
+func (c *StreamClient) Query(ctx context.Context, req *streamv1.QueryRequest) 
(*streamv1.QueryResponse, error) {
+       return c.serviceClient.Query(ctx, req)
+}
+
+// VerifyGroup checks if a group exists.
+func (c *StreamClient) VerifyGroup(ctx context.Context, group string) (bool, 
error) {
+       req := &databasev1.GroupRegistryServiceGetRequest{
+               Group: group,
+       }
+
+       _, err := c.groupClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get group: %w", err)
+       }
+
+       return true, nil
+}
+
+// VerifyIndexRule checks if an index rule exists.
+func (c *StreamClient) VerifyIndexRule(ctx context.Context, group, name 
string) (bool, error) {
+       req := &databasev1.IndexRuleRegistryServiceGetRequest{
+               Metadata: &commonv1.Metadata{
+                       Group: group,
+                       Name:  name,
+               },
+       }
+
+       _, err := c.indexRuleClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get index rule: %w", err)
+       }
+
+       return true, nil
+}
+
+// VerifyIndexRuleBinding checks if an index rule binding exists.
+func (c *StreamClient) VerifyIndexRuleBinding(ctx context.Context, group, name 
string) (bool, error) {
+       req := &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+               Metadata: &commonv1.Metadata{
+                       Group: group,
+                       Name:  name,
+               },
+       }
+
+       _, err := c.indexRuleBindingClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get index rule binding: 
%w", err)
+       }
+
+       return true, nil
+}
diff --git a/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go 
b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go
new file mode 100644
index 00000000..b3f24f68
--- /dev/null
+++ b/test/stress/stream-vs-trace/stream_vs_trace_suite_test.go
@@ -0,0 +1,159 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package streamvstrace provides performance comparison test between Stream 
and Trace models.
+package streamvstrace
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestStreamVsTrace(t *testing.T) {
+       gomega.RegisterFailHandler(g.Fail)
+       g.RunSpecs(t, "Stream vs Trace Performance Suite", 
g.Label("integration", "performance", "slow"))
+}
+
+var _ = g.Describe("Stream vs Trace Performance", func() {
+       g.BeforeEach(func() {
+               gomega.Expect(logger.Init(logger.Logging{
+                       Env:   "dev",
+                       Level: flags.LogLevel,
+               })).To(gomega.Succeed())
+       })
+
+       g.It("should setup schemas and run performance comparison", func() {
+               path, deferFn, err := test.NewSpace()
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               defer deferFn()
+
+               var ports []int
+               ports, err = test.AllocateFreePorts(4)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+               // Setup BanyanDB with schema loaders
+               addr, _, closerServerFunc := 
setup.ClosableStandaloneWithSchemaLoaders(
+                       path, ports,
+                       
[]setup.SchemaLoader{NewSchemaLoader("performance-test")},
+                       "--logging-level", "info")
+
+               g.DeferCleanup(func() {
+                       closerServerFunc()
+                       helpers.PrintDiskUsage(path, 5, 0)
+               })
+
+               // Wait for server to be ready
+               gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
+                       flags.EventuallyTimeout).Should(gomega.Succeed())
+
+               // Create gRPC connection
+               conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               defer conn.Close()
+
+               // Run performance tests
+               g.By("Running Stream vs Trace performance comparison")
+
+               // TODO: Implement actual performance tests
+               // This is a placeholder for the performance test implementation
+               fmt.Println("Schema setup completed successfully!")
+               fmt.Println("Stream group: stream_performance_test")
+               fmt.Println("Trace group: trace_performance_test")
+               fmt.Println("Ready to run performance tests...")
+
+               // Verify schemas are created
+               ctx := context.Background()
+
+               // Test basic connectivity
+               streamClient := NewStreamClient(conn)
+               traceClient := NewTraceClient(conn)
+
+               // Verify stream group
+               streamGroupExists, err := streamClient.VerifyGroup(ctx, 
"stream_performance_test")
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(streamGroupExists).To(gomega.BeTrue())
+
+               // Verify trace group
+               traceGroupExists, err := traceClient.VerifyGroup(ctx, 
"trace_performance_test")
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(traceGroupExists).To(gomega.BeTrue())
+
+               // Verify stream index rules
+               streamIndexRules := []string{
+                       "latency_index",
+                       "trace_id_index",
+                       "span_id_index",
+                       "parent_span_id_index",
+                       "operation_name_index",
+                       "component_index",
+                       "is_error_index",
+               }
+               var ruleExists bool
+               for _, ruleName := range streamIndexRules {
+                       ruleExists, err = streamClient.VerifyIndexRule(ctx, 
"stream_performance_test", ruleName)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       gomega.Expect(ruleExists).To(gomega.BeTrue())
+               }
+
+               // Verify trace index rules
+               traceIndexRules := []string{
+                       "time_based_index",
+                       "latency_based_index",
+               }
+               for _, ruleName := range traceIndexRules {
+                       ruleExists, err = traceClient.VerifyIndexRule(ctx, 
"trace_performance_test", ruleName)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       gomega.Expect(ruleExists).To(gomega.BeTrue())
+               }
+
+               // Verify stream index rule binding
+               streamBindingExists, err := 
streamClient.VerifyIndexRuleBinding(ctx, "stream_performance_test", 
"segment_stream_binding")
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(streamBindingExists).To(gomega.BeTrue())
+
+               // Verify trace index rule binding
+               traceBindingExists, err := 
traceClient.VerifyIndexRuleBinding(ctx, "trace_performance_test", 
"segment_trace_binding")
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(traceBindingExists).To(gomega.BeTrue())
+
+               // Verify stream schema
+               streamExists, err := streamClient.VerifySchema(ctx, 
"stream_performance_test", "segment_stream")
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(streamExists).To(gomega.BeTrue())
+
+               // Verify trace schema
+               traceExists, err := traceClient.VerifySchema(ctx, 
"trace_performance_test", "segment_trace")
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(traceExists).To(gomega.BeTrue())
+
+               fmt.Println("All schemas, groups, index rules, and index rule 
bindings verified successfully!")
+       })
+})
diff --git a/test/stress/stream-vs-trace/testdata/schema/README.md 
b/test/stress/stream-vs-trace/testdata/schema/README.md
new file mode 100644
index 00000000..01ad6709
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/README.md
@@ -0,0 +1,52 @@
+# Schema Files for Stream vs Trace Performance Test
+
+This directory contains the schema definitions for the performance comparison 
test between BanyanDB's Stream and Trace models.
+
+## Files
+
+### Group Definitions
+- `group.json` - Defines two separate groups:
+  - `stream_performance_test` with CATALOG_STREAM
+  - `trace_performance_test` with CATALOG_TRACE
+
+### Stream Model (stream_performance_test group)
+- `stream_schema.json` - Stream schema definition with entity-based structure
+- `stream_index_rules.json` - Individual index rules for each tag (except 
entity tags and dataBinary)
+- `stream_index_rule_bindings.json` - Binds index rules to the stream schema
+
+### Trace Model (trace_performance_test group)
+- `trace_schema.json` - Trace schema definition with trace-specific structure
+- `trace_index_rules.json` - Two composite index rules (time-based and 
latency-based)
+- `trace_index_rule_bindings.json` - Binds index rules to the trace schema
+
+## Schema Differences
+
+### Stream Model
+- Uses entity-based structure with `serviceId` and `serviceInstanceId` as 
entities
+- `dataBinary` is stored in a separate tag family
+- Individual index rules for each tag (except entity tags and dataBinary)
+- No time-based index (startTime is not indexed)
+
+### Trace Model
+- Uses trace-specific structure with `traceId` as trace ID tag and `startTime` 
as timestamp tag
+- `dataBinary` is stored as a regular tag
+- Two composite index rules:
+  - `serviceId + serviceInstanceId + startTime`
+  - `serviceId + serviceInstanceId + latency`
+
+## Usage
+
+These schema files can be used to:
+1. Create the necessary schemas in BanyanDB for the performance test
+2. Validate schema definitions before running tests
+3. Reference the exact structure used in the performance comparison
+
+## Data Structure
+
+Both schemas are designed to store SkyWalking segment data (spans) with the 
following fields:
+- `serviceId`, `serviceInstanceId` - Service identification
+- `traceId`, `spanId`, `parentSpanId` - Trace and span identification
+- `startTime`, `latency` - Temporal information
+- `operationName`, `component` - Semantic information
+- `isError` - Error status
+- `dataBinary` - Serialized span data
diff --git a/test/stress/stream-vs-trace/testdata/schema/group.json 
b/test/stress/stream-vs-trace/testdata/schema/group.json
new file mode 100644
index 00000000..1bc37b6b
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/group.json
@@ -0,0 +1,36 @@
+[
+  {
+    "metadata": {
+      "name": "stream_performance_test"
+    },
+    "catalog": "CATALOG_STREAM",
+    "resource_opts": {
+      "shard_num": 2,
+      "segment_interval": {
+        "unit": "UNIT_DAY",
+        "num": 1
+      },
+      "ttl": {
+        "unit": "UNIT_DAY",
+        "num": 7
+      }
+    }
+  },
+  {
+    "metadata": {
+      "name": "trace_performance_test"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+      "shard_num": 2,
+      "segment_interval": {
+        "unit": "UNIT_DAY",
+        "num": 1
+      },
+      "ttl": {
+        "unit": "UNIT_DAY",
+        "num": 7
+      }
+    }
+  }
+]
diff --git 
a/test/stress/stream-vs-trace/testdata/schema/stream_index_rule_bindings.json 
b/test/stress/stream-vs-trace/testdata/schema/stream_index_rule_bindings.json
new file mode 100644
index 00000000..cf0ae432
--- /dev/null
+++ 
b/test/stress/stream-vs-trace/testdata/schema/stream_index_rule_bindings.json
@@ -0,0 +1,23 @@
+[
+  {
+    "metadata": {
+      "name": "segment_stream_binding",
+      "group": "stream_performance_test"
+    },
+    "rules": [
+      "latency_index",
+      "trace_id_index",
+      "span_id_index",
+      "parent_span_id_index",
+      "operation_name_index",
+      "component_index",
+      "is_error_index"
+    ],
+    "subject": {
+      "name": "segment_stream",
+      "catalog": "CATALOG_STREAM"
+    },
+    "begin_at": "2024-01-01T00:00:00Z",
+    "expire_at": "2030-01-01T00:00:00Z"
+  }
+]
diff --git 
a/test/stress/stream-vs-trace/testdata/schema/stream_index_rules.json 
b/test/stress/stream-vs-trace/testdata/schema/stream_index_rules.json
new file mode 100644
index 00000000..fcecafd1
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/stream_index_rules.json
@@ -0,0 +1,58 @@
+[
+  {
+    "metadata": {
+      "name": "latency_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["latency"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "trace_id_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["traceId"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "span_id_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["spanId"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "parent_span_id_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["parentSpanId"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "operation_name_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["operationName"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "component_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["component"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "is_error_index",
+      "group": "stream_performance_test"
+    },
+    "tags": ["isError"],
+    "type": "TYPE_INVERTED"
+  }
+]
diff --git a/test/stress/stream-vs-trace/testdata/schema/stream_schema.json 
b/test/stress/stream-vs-trace/testdata/schema/stream_schema.json
new file mode 100644
index 00000000..cbc323e3
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/stream_schema.json
@@ -0,0 +1,65 @@
+{
+  "metadata": {
+    "name": "segment_stream",
+    "group": "stream_performance_test"
+  },
+  "entity": {
+    "tag_names": ["serviceId", "serviceInstanceId"]
+  },
+  "tag_families": [
+    {
+      "name": "primary",
+      "tags": [
+        {
+          "name": "serviceId",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "serviceInstanceId",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "traceId",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "startTime",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "latency",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "isError",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "spanId",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "parentSpanId",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "operationName",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "component",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    },
+    {
+      "name": "data_binary",
+      "tags": [
+        {
+          "name": "dataBinary",
+          "type": "TAG_TYPE_DATA_BINARY"
+        }
+      ]
+    }
+  ]
+}
diff --git 
a/test/stress/stream-vs-trace/testdata/schema/trace_index_rule_bindings.json 
b/test/stress/stream-vs-trace/testdata/schema/trace_index_rule_bindings.json
new file mode 100644
index 00000000..f1f3da46
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/trace_index_rule_bindings.json
@@ -0,0 +1,18 @@
+[
+  {
+    "metadata": {
+      "name": "segment_trace_binding",
+      "group": "trace_performance_test"
+    },
+    "rules": [
+      "time_based_index",
+      "latency_based_index"
+    ],
+    "subject": {
+      "name": "segment_trace",
+      "catalog": "CATALOG_TRACE"
+    },
+    "begin_at": "2024-01-01T00:00:00Z",
+    "expire_at": "2030-01-01T00:00:00Z"
+  }
+]
diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_index_rules.json 
b/test/stress/stream-vs-trace/testdata/schema/trace_index_rules.json
new file mode 100644
index 00000000..106d73eb
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/trace_index_rules.json
@@ -0,0 +1,18 @@
+[
+  {
+    "metadata": {
+      "name": "time_based_index",
+      "group": "trace_performance_test"
+    },
+    "tags": ["serviceId", "serviceInstanceId", "startTime"],
+    "type": "TYPE_INVERTED"
+  },
+  {
+    "metadata": {
+      "name": "latency_based_index",
+      "group": "trace_performance_test"
+    },
+    "tags": ["serviceId", "serviceInstanceId", "latency"],
+    "type": "TYPE_INVERTED"
+  }
+]
diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json 
b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
new file mode 100644
index 00000000..2000cc53
--- /dev/null
+++ b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
@@ -0,0 +1,54 @@
+{
+  "metadata": {
+    "name": "segment_trace",
+    "group": "trace_performance_test"
+  },
+  "tags": [
+    {
+      "name": "serviceId",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "serviceInstanceId",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "traceId",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "startTime",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "latency",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "isError",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "spanId",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "parentSpanId",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "operationName",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "component",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "dataBinary",
+      "type": "TAG_TYPE_DATA_BINARY"
+    }
+  ],
+  "trace_id_tag_name": "traceId",
+  "timestamp_tag_name": "startTime"
+}
diff --git a/test/stress/stream-vs-trace/trace_client.go 
b/test/stress/stream-vs-trace/trace_client.go
new file mode 100644
index 00000000..f3bd04ef
--- /dev/null
+++ b/test/stress/stream-vs-trace/trace_client.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package streamvstrace
+
+import (
+       "context"
+       "fmt"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+)
+
+// TraceClient provides methods to interact with trace services.
+type TraceClient struct {
+       registryClient         databasev1.TraceRegistryServiceClient
+       serviceClient          tracev1.TraceServiceClient
+       groupClient            databasev1.GroupRegistryServiceClient
+       indexRuleClient        databasev1.IndexRuleRegistryServiceClient
+       indexRuleBindingClient databasev1.IndexRuleBindingRegistryServiceClient
+}
+
+// NewTraceClient creates a new TraceClient instance.
+func NewTraceClient(conn *grpc.ClientConn) *TraceClient {
+       return &TraceClient{
+               registryClient:         
databasev1.NewTraceRegistryServiceClient(conn),
+               serviceClient:          tracev1.NewTraceServiceClient(conn),
+               groupClient:            
databasev1.NewGroupRegistryServiceClient(conn),
+               indexRuleClient:        
databasev1.NewIndexRuleRegistryServiceClient(conn),
+               indexRuleBindingClient: 
databasev1.NewIndexRuleBindingRegistryServiceClient(conn),
+       }
+}
+
+// VerifySchema checks if a trace schema exists.
+func (c *TraceClient) VerifySchema(ctx context.Context, group, name string) 
(bool, error) {
+       req := &databasev1.TraceRegistryServiceGetRequest{
+               Metadata: &commonv1.Metadata{
+                       Group: group,
+                       Name:  name,
+               },
+       }
+
+       _, err := c.registryClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get trace schema: %w", err)
+       }
+
+       return true, nil
+}
+
+func (c *TraceClient) Write(ctx context.Context, _ *tracev1.WriteRequest) 
(tracev1.TraceService_WriteClient, error) {
+       return c.serviceClient.Write(ctx)
+}
+
+// Query executes a query against the trace service.
+func (c *TraceClient) Query(ctx context.Context, req *tracev1.QueryRequest) 
(*tracev1.QueryResponse, error) {
+       return c.serviceClient.Query(ctx, req)
+}
+
+// VerifyGroup checks if a group exists.
+func (c *TraceClient) VerifyGroup(ctx context.Context, group string) (bool, 
error) {
+       req := &databasev1.GroupRegistryServiceGetRequest{
+               Group: group,
+       }
+
+       _, err := c.groupClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get group: %w", err)
+       }
+
+       return true, nil
+}
+
+// VerifyIndexRule checks if an index rule exists.
+func (c *TraceClient) VerifyIndexRule(ctx context.Context, group, name string) 
(bool, error) {
+       req := &databasev1.IndexRuleRegistryServiceGetRequest{
+               Metadata: &commonv1.Metadata{
+                       Group: group,
+                       Name:  name,
+               },
+       }
+
+       _, err := c.indexRuleClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get index rule: %w", err)
+       }
+
+       return true, nil
+}
+
+// VerifyIndexRuleBinding checks if an index rule binding exists.
+func (c *TraceClient) VerifyIndexRuleBinding(ctx context.Context, group, name 
string) (bool, error) {
+       req := &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+               Metadata: &commonv1.Metadata{
+                       Group: group,
+                       Name:  name,
+               },
+       }
+
+       _, err := c.indexRuleBindingClient.Get(ctx, req)
+       if err != nil {
+               if status.Code(err) == codes.NotFound {
+                       return false, nil
+               }
+               return false, fmt.Errorf("failed to get index rule binding: 
%w", err)
+       }
+
+       return true, nil
+}


Reply via email to