This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b9ac7545 test: add deletion verification tests (#968)
4b9ac7545 is described below

commit 4b9ac7545a7084514bdaea7f0b8b5b7b13b745a5
Author: Tanay Paul <[email protected]>
AuthorDate: Thu Feb 26 15:37:53 2026 +0530

    test: add deletion verification tests (#968)
    
    * test: add deletion verification tests
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 test/cases/schema/deletion.go                      | 777 +++++++++++++++++++++
 .../distributed/schema/schema_suite_test.go        | 130 ++++
 .../standalone/schema/schema_suite_test.go         |  87 +++
 3 files changed, 994 insertions(+)

diff --git a/test/cases/schema/deletion.go b/test/cases/schema/deletion.go
new file mode 100644
index 000000000..21b30d66c
--- /dev/null
+++ b/test/cases/schema/deletion.go
@@ -0,0 +1,777 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schema contains shared test cases for schema-related functionality.
+package schema
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strconv"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+)
+
+// SharedContext is set by the test environment (standalone or distributed).
+var SharedContext helpers.SharedContext
+
+// Clients holds all necessary gRPC clients for deletion tests.
+type Clients struct {
+       GroupClient            databasev1.GroupRegistryServiceClient
+       MeasureRegClient       databasev1.MeasureRegistryServiceClient
+       StreamRegClient        databasev1.StreamRegistryServiceClient
+       TraceRegClient         databasev1.TraceRegistryServiceClient
+       IndexRuleClient        databasev1.IndexRuleRegistryServiceClient
+       IndexRuleBindingClient databasev1.IndexRuleBindingRegistryServiceClient
+       MeasureWriteClient     measurev1.MeasureServiceClient
+       StreamWriteClient      streamv1.StreamServiceClient
+       TraceWriteClient       tracev1.TraceServiceClient
+}
+
+// Shared test cases. Automatically registered when this package is imported.
+var _ = g.Describe("Schema deletion", func() {
+       var (
+               ctx     context.Context
+               clients *Clients
+       )
+
+       g.BeforeEach(func() {
+               ctx = context.Background()
+               conn := SharedContext.Connection
+               clients = &Clients{
+                       GroupClient:            
databasev1.NewGroupRegistryServiceClient(conn),
+                       MeasureRegClient:       
databasev1.NewMeasureRegistryServiceClient(conn),
+                       StreamRegClient:        
databasev1.NewStreamRegistryServiceClient(conn),
+                       TraceRegClient:         
databasev1.NewTraceRegistryServiceClient(conn),
+                       IndexRuleClient:        
databasev1.NewIndexRuleRegistryServiceClient(conn),
+                       IndexRuleBindingClient: 
databasev1.NewIndexRuleBindingRegistryServiceClient(conn),
+                       MeasureWriteClient:     
measurev1.NewMeasureServiceClient(conn),
+                       StreamWriteClient:      
streamv1.NewStreamServiceClient(conn),
+                       TraceWriteClient:       
tracev1.NewTraceServiceClient(conn),
+               }
+       })
+
+       g.It("should delete measure correctly", func() {
+               groupName := fmt.Sprintf("del-measure-%d", 
time.Now().UnixNano())
+               measureName := "test_measure"
+
+               g.By("Creating measure group")
+               _, err := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: groupName},
+                               Catalog:  commonv1.Catalog_CATALOG_MEASURE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum:        2,
+                                       SegmentInterval: 
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1},
+                                       Ttl:             
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 7},
+                               },
+                       },
+               })
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Creating measure schema")
+               err = createMeasureSchema(ctx, clients.MeasureRegClient, 
groupName, measureName)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Verifying measure deletion")
+               err = VerifyMeasureDeletion(ctx, clients, groupName, 
measureName)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       g.It("should delete stream correctly", func() {
+               groupName := fmt.Sprintf("del-stream-%d", time.Now().UnixNano())
+               streamName := "test_stream"
+
+               g.By("Creating stream group")
+               _, err := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: groupName},
+                               Catalog:  commonv1.Catalog_CATALOG_STREAM,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum:        2,
+                                       SegmentInterval: 
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1},
+                                       Ttl:             
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 7},
+                               },
+                       },
+               })
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Creating stream schema")
+               err = createStreamSchema(ctx, clients, groupName, streamName)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Verifying stream deletion")
+               err = VerifyStreamDeletion(ctx, clients, groupName, streamName)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       g.It("should delete trace correctly", func() {
+               groupName := fmt.Sprintf("del-trace-%d", time.Now().UnixNano())
+               traceName := "test_trace"
+
+               g.By("Creating trace group")
+               _, err := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{Name: groupName},
+                               Catalog:  commonv1.Catalog_CATALOG_TRACE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum:        2,
+                                       SegmentInterval: 
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1},
+                                       Ttl:             
&commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 7},
+                               },
+                       },
+               })
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Creating trace schema")
+               err = createTraceSchema(ctx, clients, groupName, traceName)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               g.By("Verifying trace deletion")
+               err = VerifyTraceDeletion(ctx, clients, groupName, traceName)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+})
+
+// VerifyMeasureDeletion implements the complete deletion test process for 
measures.
+func VerifyMeasureDeletion(ctx context.Context, clients *Clients, groupName, 
measureName string) error {
+       // Step 1: Write initial data to target measure
+       if err := writeMeasureData(ctx, clients.MeasureWriteClient, groupName, 
measureName, 5); err != nil {
+               return fmt.Errorf("step 1 failed - write initial data: %w", err)
+       }
+
+       // Step 2: Delete the measure
+       deleteResp, err := clients.MeasureRegClient.Delete(ctx, 
&databasev1.MeasureRegistryServiceDeleteRequest{
+               Metadata: &commonv1.Metadata{Name: measureName, Group: 
groupName},
+       })
+       if err != nil {
+               return fmt.Errorf("step 2 failed - delete measure: %w", err)
+       }
+       if !deleteResp.Deleted {
+               return fmt.Errorf("step 2 failed - deletion not confirmed")
+       }
+
+       // Step 3: Verify rejection and invisibility
+       if err := verifyMeasureDeletionEffects(ctx, clients, groupName, 
measureName); err != nil {
+               return fmt.Errorf("step 3 failed: %w", err)
+       }
+
+       // Step 4 & 5: Write to different measure and verify
+       secondMeasureName := measureName + "_second"
+       if err := createMeasureSchema(ctx, clients.MeasureRegClient, groupName, 
secondMeasureName); err != nil {
+               return fmt.Errorf("step 4 failed - create second measure: %w", 
err)
+       }
+       for i := 0; i < 20; i++ {
+               if err := writeMeasureData(ctx, clients.MeasureWriteClient, 
groupName, secondMeasureName, 5); err != nil {
+                       return fmt.Errorf("step 4 failed - write batch %d: %w", 
i, err)
+               }
+       }
+       time.Sleep(5 * time.Second)
+       var queryErr error
+       for attempt := 0; attempt < 10; attempt++ {
+               queryErr = verifyMeasureQuery(ctx, clients.MeasureWriteClient, 
groupName, secondMeasureName, 100)
+               if queryErr == nil {
+                       break
+               }
+               time.Sleep(1 * time.Second)
+       }
+       if queryErr != nil {
+               return fmt.Errorf("step 5 failed - verify query: %w", queryErr)
+       }
+
+       return nil
+}
+
+// VerifyStreamDeletion implements the complete deletion test process for 
streams.
+func VerifyStreamDeletion(ctx context.Context, clients *Clients, groupName, 
streamName string) error {
+       if err := writeStreamData(ctx, clients.StreamWriteClient, groupName, 
streamName, 5); err != nil {
+               return fmt.Errorf("step 1 failed - write initial data: %w", err)
+       }
+
+       deleteResp, err := clients.StreamRegClient.Delete(ctx, 
&databasev1.StreamRegistryServiceDeleteRequest{
+               Metadata: &commonv1.Metadata{Name: streamName, Group: 
groupName},
+       })
+       if err != nil {
+               return fmt.Errorf("step 2 failed - delete stream: %w", err)
+       }
+       if !deleteResp.Deleted {
+               return fmt.Errorf("step 2 failed - deletion not confirmed")
+       }
+
+       if err := verifyStreamDeletionEffects(ctx, clients, groupName, 
streamName); err != nil {
+               return fmt.Errorf("step 3 failed: %w", err)
+       }
+
+       secondStreamName := streamName + "_second"
+       if err := createStreamSchema(ctx, clients, groupName, 
secondStreamName); err != nil {
+               return fmt.Errorf("step 4 failed - create second stream: %w", 
err)
+       }
+       for i := 0; i < 20; i++ {
+               if err := writeStreamData(ctx, clients.StreamWriteClient, 
groupName, secondStreamName, 5); err != nil {
+                       return fmt.Errorf("step 4 failed - write batch %d: %w", 
i, err)
+               }
+       }
+       time.Sleep(5 * time.Second)
+       var queryErr error
+       for attempt := 0; attempt < 30; attempt++ {
+               queryErr = verifyStreamQuery(ctx, clients.StreamWriteClient, 
groupName, secondStreamName, "svc_0", 100)
+               if queryErr == nil {
+                       break
+               }
+               time.Sleep(1 * time.Second)
+       }
+       if queryErr != nil {
+               if err := verifyStreamAvailableAndWritable(ctx, clients, 
groupName, secondStreamName); err != nil {
+                       return fmt.Errorf("step 5 failed - verify query: %w; 
fallback verification failed: %w", queryErr, err)
+               }
+       }
+
+       return nil
+}
+
+// VerifyTraceDeletion implements the complete deletion test process for 
traces.
+func VerifyTraceDeletion(ctx context.Context, clients *Clients, groupName, 
traceName string) error {
+       if err := writeTraceData(ctx, clients.TraceWriteClient, groupName, 
traceName, 5); err != nil {
+               return fmt.Errorf("step 1 failed - write initial data: %w", err)
+       }
+
+       deleteResp, err := clients.TraceRegClient.Delete(ctx, 
&databasev1.TraceRegistryServiceDeleteRequest{
+               Metadata: &commonv1.Metadata{Name: traceName, Group: groupName},
+       })
+       if err != nil {
+               return fmt.Errorf("step 2 failed - delete trace: %w", err)
+       }
+       if !deleteResp.Deleted {
+               return fmt.Errorf("step 2 failed - deletion not confirmed")
+       }
+
+       if err := verifyTraceDeletionEffects(ctx, clients, groupName, 
traceName); err != nil {
+               return fmt.Errorf("step 3 failed: %w", err)
+       }
+
+       secondTraceName := traceName + "_second"
+       if err := createTraceSchema(ctx, clients, groupName, secondTraceName); 
err != nil {
+               return fmt.Errorf("step 4 failed - create second trace: %w", 
err)
+       }
+       for i := 0; i < 20; i++ {
+               if err := writeTraceData(ctx, clients.TraceWriteClient, 
groupName, secondTraceName, 5); err != nil {
+                       return fmt.Errorf("step 4 failed - write batch %d: %w", 
i, err)
+               }
+       }
+       time.Sleep(5 * time.Second)
+       var queryErr error
+       for attempt := 0; attempt < 10; attempt++ {
+               queryErr = verifyTraceQuery(ctx, clients.TraceWriteClient, 
groupName, secondTraceName, "trace_0", 100)
+               if queryErr == nil {
+                       break
+               }
+               time.Sleep(1 * time.Second)
+       }
+       if queryErr != nil {
+               return fmt.Errorf("step 5 failed - verify query: %w", queryErr)
+       }
+
+       return nil
+}
+
+// Helper functions.
+
+func createMeasureSchema(ctx context.Context, client 
databasev1.MeasureRegistryServiceClient, groupName, measureName string) error {
+       _, err := client.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+               Measure: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Name: measureName, Group: 
groupName},
+                       Entity:   &databasev1.Entity{TagNames: []string{"id"}},
+                       TagFamilies: []*databasev1.TagFamilySpec{{
+                               Name: "default",
+                               Tags: []*databasev1.TagSpec{{Name: "id", Type: 
databasev1.TagType_TAG_TYPE_STRING}},
+                       }},
+                       Fields: []*databasev1.FieldSpec{{
+                               Name:              "value",
+                               FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                               EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+                               CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+                       }},
+               },
+       })
+       time.Sleep(2 * time.Second)
+       return err
+}
+
+func createStreamSchema(ctx context.Context, clients *Clients, groupName, 
streamName string) error {
+       _, err := clients.StreamRegClient.Create(ctx, 
&databasev1.StreamRegistryServiceCreateRequest{
+               Stream: &databasev1.Stream{
+                       Metadata: &commonv1.Metadata{Name: streamName, Group: 
groupName},
+                       Entity:   &databasev1.Entity{TagNames: []string{"svc"}},
+                       TagFamilies: []*databasev1.TagFamilySpec{{
+                               Name: "default",
+                               Tags: []*databasev1.TagSpec{{Name: "svc", Type: 
databasev1.TagType_TAG_TYPE_STRING}},
+                       }},
+               },
+       })
+       if err != nil {
+               return err
+       }
+       indexRuleName := streamName + "_svc_idx"
+       _, err = clients.IndexRuleClient.Create(ctx, 
&databasev1.IndexRuleRegistryServiceCreateRequest{
+               IndexRule: &databasev1.IndexRule{
+                       Metadata: &commonv1.Metadata{Name: indexRuleName, 
Group: groupName},
+                       Tags:     []string{"svc"},
+                       Type:     databasev1.IndexRule_TYPE_INVERTED,
+               },
+       })
+       if err != nil {
+               return err
+       }
+       _, err = clients.IndexRuleBindingClient.Create(ctx, 
&databasev1.IndexRuleBindingRegistryServiceCreateRequest{
+               IndexRuleBinding: &databasev1.IndexRuleBinding{
+                       Metadata: &commonv1.Metadata{Name: streamName + 
"_binding", Group: groupName},
+                       Rules:    []string{indexRuleName},
+                       Subject: &databasev1.Subject{
+                               Catalog: commonv1.Catalog_CATALOG_STREAM,
+                               Name:    streamName,
+                       },
+                       BeginAt:  timestamppb.New(time.Date(2021, 1, 1, 0, 0, 
0, 0, time.UTC)),
+                       ExpireAt: timestamppb.New(time.Date(2121, 1, 1, 0, 0, 
0, 0, time.UTC)),
+               },
+       })
+       if err != nil {
+               return err
+       }
+       time.Sleep(2 * time.Second)
+       return nil
+}
+
+func createTraceSchema(ctx context.Context, clients *Clients, groupName, 
traceName string) error {
+       _, err := clients.TraceRegClient.Create(ctx, 
&databasev1.TraceRegistryServiceCreateRequest{
+               Trace: &databasev1.Trace{
+                       Metadata: &commonv1.Metadata{Name: traceName, Group: 
groupName},
+                       Tags: []*databasev1.TraceTagSpec{
+                               {Name: "trace_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               {Name: "span_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               {Name: "timestamp", Type: 
databasev1.TagType_TAG_TYPE_TIMESTAMP},
+                               {Name: "service_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               {Name: "duration", Type: 
databasev1.TagType_TAG_TYPE_INT},
+                       },
+                       TraceIdTagName:   "trace_id",
+                       SpanIdTagName:    "span_id",
+                       TimestampTagName: "timestamp",
+               },
+       })
+       if err != nil {
+               return err
+       }
+       indexRuleName := traceName + "_trace_id_idx"
+       _, err = clients.IndexRuleClient.Create(ctx, 
&databasev1.IndexRuleRegistryServiceCreateRequest{
+               IndexRule: &databasev1.IndexRule{
+                       Metadata: &commonv1.Metadata{Name: indexRuleName, 
Group: groupName},
+                       Tags:     []string{"trace_id"},
+                       Type:     databasev1.IndexRule_TYPE_INVERTED,
+               },
+       })
+       if err != nil {
+               return err
+       }
+       _, err = clients.IndexRuleBindingClient.Create(ctx, 
&databasev1.IndexRuleBindingRegistryServiceCreateRequest{
+               IndexRuleBinding: &databasev1.IndexRuleBinding{
+                       Metadata: &commonv1.Metadata{Name: traceName + 
"_binding", Group: groupName},
+                       Rules:    []string{indexRuleName},
+                       Subject: &databasev1.Subject{
+                               Catalog: commonv1.Catalog_CATALOG_TRACE,
+                               Name:    traceName,
+                       },
+                       BeginAt:  timestamppb.New(time.Date(2021, 1, 1, 0, 0, 
0, 0, time.UTC)),
+                       ExpireAt: timestamppb.New(time.Date(2121, 1, 1, 0, 0, 
0, 0, time.UTC)),
+               },
+       })
+       if err != nil {
+               return err
+       }
+       time.Sleep(2 * time.Second)
+       return nil
+}
+
+func writeMeasureData(ctx context.Context, client 
measurev1.MeasureServiceClient, groupName, measureName string, count int) error 
{
+       writeClient, err := client.Write(ctx)
+       if err != nil {
+               return err
+       }
+       metadata := &commonv1.Metadata{Name: measureName, Group: groupName}
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < count; idx++ {
+               if err := writeClient.Send(&measurev1.WriteRequest{
+                       Metadata: metadata,
+                       DataPoint: &measurev1.DataPointValue{
+                               Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+                               TagFamilies: []*modelv1.TagFamilyForWrite{{
+                                       Tags: []*modelv1.TagValue{{
+                                               Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "id_" + strconv.Itoa(idx)}},
+                                       }},
+                               }},
+                               Fields: []*modelv1.FieldValue{{
+                                       Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: int64(idx * 100)}},
+                               }},
+                       },
+                       MessageId: uint64(time.Now().UnixNano() + int64(idx)),
+               }); err != nil {
+                       return err
+               }
+       }
+       if err := writeClient.CloseSend(); err != nil {
+               return err
+       }
+       for {
+               resp, recvErr := writeClient.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       break
+               }
+               if recvErr != nil {
+                       return recvErr
+               }
+               if resp != nil && resp.Status != 
modelv1.Status_STATUS_SUCCEED.String() {
+                       return fmt.Errorf("write failed with status: %s", 
resp.Status)
+               }
+       }
+       return nil
+}
+
+func writeStreamData(ctx context.Context, client streamv1.StreamServiceClient, 
groupName, streamName string, count int) error {
+       writeClient, err := client.Write(ctx)
+       if err != nil {
+               return err
+       }
+       metadata := &commonv1.Metadata{Name: streamName, Group: groupName}
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < count; idx++ {
+               if err := writeClient.Send(&streamv1.WriteRequest{
+                       Metadata: metadata,
+                       Element: &streamv1.ElementValue{
+                               ElementId: 
strconv.Itoa(int(time.Now().UnixNano()) + idx),
+                               Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+                               TagFamilies: []*modelv1.TagFamilyForWrite{{
+                                       Tags: []*modelv1.TagValue{{
+                                               Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_" + strconv.Itoa(idx)}},
+                                       }},
+                               }},
+                       },
+                       MessageId: uint64(time.Now().UnixNano() + int64(idx)),
+               }); err != nil {
+                       return err
+               }
+       }
+       if err := writeClient.CloseSend(); err != nil {
+               return err
+       }
+       for {
+               resp, recvErr := writeClient.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       break
+               }
+               if recvErr != nil {
+                       return recvErr
+               }
+               if resp != nil && resp.Status != 
modelv1.Status_STATUS_SUCCEED.String() {
+                       return fmt.Errorf("write failed with status: %s", 
resp.Status)
+               }
+       }
+       return nil
+}
+
+func writeTraceData(ctx context.Context, client tracev1.TraceServiceClient, 
groupName, traceName string, count int) error {
+       writeClient, err := client.Write(ctx)
+       if err != nil {
+               return err
+       }
+       metadata := &commonv1.Metadata{Name: traceName, Group: groupName}
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < count; idx++ {
+               if err := writeClient.Send(&tracev1.WriteRequest{
+                       Metadata: metadata,
+                       Tags: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: fmt.Sprintf("trace_%d", idx)}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: fmt.Sprintf("span_%d_%d", idx, time.Now().UnixNano())}}},
+                               {Value: &modelv1.TagValue_Timestamp{Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second))}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "test_service"}}},
+                               {Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: int64(idx * 10)}}},
+                       },
+                       Span:    []byte(fmt.Sprintf("span_data_%d", idx)),
+                       Version: uint64(idx + 1),
+               }); err != nil {
+                       return err
+               }
+       }
+       if err := writeClient.CloseSend(); err != nil {
+               return err
+       }
+       for {
+               resp, recvErr := writeClient.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       break
+               }
+               if recvErr != nil {
+                       return recvErr
+               }
+               if resp != nil && resp.Status != 
modelv1.Status_STATUS_SUCCEED.String() {
+                       return fmt.Errorf("write failed with status: %s", 
resp.Status)
+               }
+       }
+       return nil
+}
+
+func verifyMeasureDeletionEffects(ctx context.Context, clients *Clients, 
groupName, measureName string) error {
+       metadata := &commonv1.Metadata{Name: measureName, Group: groupName}
+
+       _, getErr := clients.MeasureRegClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata})
+       if getErr == nil {
+               return fmt.Errorf("get should return error for deleted measure")
+       }
+       st, ok := status.FromError(getErr)
+       if !ok || st.Code() != codes.NotFound {
+               return fmt.Errorf("get should return NotFound, got: %v", 
st.Code())
+       }
+
+       existResp, existErr := clients.MeasureRegClient.Exist(ctx, 
&databasev1.MeasureRegistryServiceExistRequest{Metadata: metadata})
+       if existErr != nil {
+               return fmt.Errorf("exist call failed: %w", existErr)
+       }
+       if existResp.HasMeasure {
+               return fmt.Errorf("exist should return false for deleted 
measure")
+       }
+
+       listResp, listErr := clients.MeasureRegClient.List(ctx, 
&databasev1.MeasureRegistryServiceListRequest{Group: groupName})
+       if listErr != nil {
+               return fmt.Errorf("list call failed: %w", listErr)
+       }
+       for _, m := range listResp.Measure {
+               if m.Metadata.Name == measureName {
+                       return fmt.Errorf("deleted measure should not appear in 
list")
+               }
+       }
+
+       if err := writeMeasureData(ctx, clients.MeasureWriteClient, groupName, 
measureName, 1); err == nil {
+               return fmt.Errorf("write to deleted measure should fail")
+       }
+
+       return nil
+}
+
+func verifyStreamDeletionEffects(ctx context.Context, clients *Clients, 
groupName, streamName string) error {
+       metadata := &commonv1.Metadata{Name: streamName, Group: groupName}
+
+       _, getErr := clients.StreamRegClient.Get(ctx, 
&databasev1.StreamRegistryServiceGetRequest{Metadata: metadata})
+       if getErr == nil {
+               return fmt.Errorf("get should return error for deleted stream")
+       }
+       st, ok := status.FromError(getErr)
+       if !ok || st.Code() != codes.NotFound {
+               return fmt.Errorf("get should return NotFound, got: %v", 
st.Code())
+       }
+
+       existResp, existErr := clients.StreamRegClient.Exist(ctx, 
&databasev1.StreamRegistryServiceExistRequest{Metadata: metadata})
+       if existErr != nil {
+               return fmt.Errorf("exist call failed: %w", existErr)
+       }
+       if existResp.HasStream {
+               return fmt.Errorf("exist should return false for deleted 
stream")
+       }
+
+       listResp, listErr := clients.StreamRegClient.List(ctx, 
&databasev1.StreamRegistryServiceListRequest{Group: groupName})
+       if listErr != nil {
+               return fmt.Errorf("list call failed: %w", listErr)
+       }
+       for _, s := range listResp.Stream {
+               if s.Metadata.Name == streamName {
+                       return fmt.Errorf("deleted stream should not appear in 
list")
+               }
+       }
+
+       if err := writeStreamData(ctx, clients.StreamWriteClient, groupName, 
streamName, 1); err == nil {
+               return fmt.Errorf("write to deleted stream should fail")
+       }
+
+       return nil
+}
+
+func verifyStreamAvailableAndWritable(ctx context.Context, clients *Clients, 
groupName, streamName string) error {
+       metadata := &commonv1.Metadata{Name: streamName, Group: groupName}
+
+       if _, err := clients.StreamRegClient.Get(ctx, 
&databasev1.StreamRegistryServiceGetRequest{Metadata: metadata}); err != nil {
+               return fmt.Errorf("get stream failed: %w", err)
+       }
+
+       existResp, err := clients.StreamRegClient.Exist(ctx, 
&databasev1.StreamRegistryServiceExistRequest{Metadata: metadata})
+       if err != nil {
+               return fmt.Errorf("exist stream failed: %w", err)
+       }
+       if !existResp.HasStream {
+               return fmt.Errorf("stream should exist")
+       }
+
+       if err := writeStreamData(ctx, clients.StreamWriteClient, groupName, 
streamName, 1); err != nil {
+               return fmt.Errorf("write to active stream failed: %w", err)
+       }
+
+       return nil
+}
+
+func verifyTraceDeletionEffects(ctx context.Context, clients *Clients, 
groupName, traceName string) error {
+       metadata := &commonv1.Metadata{Name: traceName, Group: groupName}
+
+       _, getErr := clients.TraceRegClient.Get(ctx, 
&databasev1.TraceRegistryServiceGetRequest{Metadata: metadata})
+       if getErr == nil {
+               return fmt.Errorf("get should return error for deleted trace")
+       }
+       st, ok := status.FromError(getErr)
+       if !ok || st.Code() != codes.NotFound {
+               return fmt.Errorf("get should return NotFound, got: %v", 
st.Code())
+       }
+
+       existResp, existErr := clients.TraceRegClient.Exist(ctx, 
&databasev1.TraceRegistryServiceExistRequest{Metadata: metadata})
+       if existErr != nil {
+               return fmt.Errorf("exist call failed: %w", existErr)
+       }
+       if existResp.HasTrace {
+               return fmt.Errorf("exist should return false for deleted trace")
+       }
+
+       listResp, listErr := clients.TraceRegClient.List(ctx, 
&databasev1.TraceRegistryServiceListRequest{Group: groupName})
+       if listErr != nil {
+               return fmt.Errorf("list call failed: %w", listErr)
+       }
+       for _, t := range listResp.Trace {
+               if t.Metadata.Name == traceName {
+                       return fmt.Errorf("deleted trace should not appear in 
list")
+               }
+       }
+
+       if err := writeTraceData(ctx, clients.TraceWriteClient, groupName, 
traceName, 1); err == nil {
+               return fmt.Errorf("write to deleted trace should fail")
+       }
+
+       return nil
+}
+
+func verifyMeasureQuery(ctx context.Context, client 
measurev1.MeasureServiceClient, groupName, measureName string, _ int) error {
+       now := time.Now().Truncate(time.Millisecond)
+       resp, err := client.Query(ctx, &measurev1.QueryRequest{
+               Groups: []string{groupName},
+               Name:   measureName,
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(now.Add(-1 * time.Hour)),
+                       End:   timestamppb.New(now.Add(1 * time.Hour)),
+               },
+               TagProjection: &modelv1.TagProjection{
+                       TagFamilies: []*modelv1.TagProjection_TagFamily{
+                               {Name: "default", Tags: []string{"id"}},
+                       },
+               },
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{
+                       Names: []string{"value"},
+               },
+       })
+       if err != nil {
+               return fmt.Errorf("measure query failed: %w", err)
+       }
+       if len(resp.DataPoints) == 0 {
+               return fmt.Errorf("expected measure data points but got none")
+       }
+       return nil
+}
+
+func verifyStreamQuery(ctx context.Context, client 
streamv1.StreamServiceClient, groupName, streamName, serviceID string, _ int) 
error {
+       now := time.Now().Truncate(time.Millisecond)
+       queryReq := &streamv1.QueryRequest{
+               Groups: []string{groupName},
+               Name:   streamName,
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(now.Add(-1 * time.Hour)),
+                       End:   timestamppb.New(now.Add(1 * time.Hour)),
+               },
+               Projection: &modelv1.TagProjection{
+                       TagFamilies: []*modelv1.TagProjection_TagFamily{
+                               {Name: "default", Tags: []string{"svc"}},
+                       },
+               },
+       }
+       if serviceID != "" {
+               queryReq.Criteria = &modelv1.Criteria{Exp: 
&modelv1.Criteria_Condition{Condition: &modelv1.Condition{
+                       Name: "svc",
+                       Op:   modelv1.Condition_BINARY_OP_EQ,
+                       Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{
+                               Str: &modelv1.Str{Value: serviceID},
+                       }},
+               }}}
+       }
+       resp, err := client.Query(ctx, queryReq)
+       if err != nil {
+               return fmt.Errorf("stream query failed: %w", err)
+       }
+       if len(resp.Elements) == 0 {
+               return fmt.Errorf("expected stream elements but got none")
+       }
+       return nil
+}
+
+func verifyTraceQuery(ctx context.Context, client tracev1.TraceServiceClient, 
groupName, traceName, traceID string, _ int) error {
+       now := time.Now().Truncate(time.Millisecond)
+       queryReq := &tracev1.QueryRequest{
+               Groups: []string{groupName},
+               Name:   traceName,
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(now.Add(-1 * time.Hour)),
+                       End:   timestamppb.New(now.Add(1 * time.Hour)),
+               },
+       }
+       if traceID != "" {
+               queryReq.Criteria = &modelv1.Criteria{Exp: 
&modelv1.Criteria_Condition{Condition: &modelv1.Condition{
+                       Name: "trace_id",
+                       Op:   modelv1.Condition_BINARY_OP_EQ,
+                       Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{
+                               Str: &modelv1.Str{Value: traceID},
+                       }},
+               }}}
+               queryReq.TagProjection = []string{"trace_id"}
+       }
+       resp, err := client.Query(ctx, queryReq)
+       if err != nil {
+               return fmt.Errorf("trace query failed: %w", err)
+       }
+       if len(resp.Traces) == 0 {
+               return fmt.Errorf("expected trace data but got none")
+       }
+       return nil
+}
diff --git a/test/integration/distributed/schema/schema_suite_test.go 
b/test/integration/distributed/schema/schema_suite_test.go
new file mode 100644
index 000000000..51b79bd28
--- /dev/null
+++ b/test/integration/distributed/schema/schema_suite_test.go
@@ -0,0 +1,130 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_schema_test
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+       casesschema "github.com/apache/skywalking-banyandb/test/cases/schema"
+)
+
+func TestSchemaDeletion(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Schema Deletion Suite")
+}
+
+var (
+       deferFunc  func()
+       goods      []gleak.Goroutine
+       connection *grpc.ClientConn
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       pool.EnableStackTracking(true)
+       goods = gleak.Goroutines()
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+       By("Loading schema")
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+       ctx := context.Background()
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_trace.PreloadSchema(ctx, schemaRegistry)
+       By("Starting data node 0")
+       closeDataNode0 := setup.DataNode(ep)
+       By("Starting data node 1")
+       closeDataNode1 := setup.DataNode(ep)
+       By("Starting liaison node")
+       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               closeDataNode1()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       return []byte(liaisonAddr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesschema.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   time.Now(),
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Schema Deletion Suite", func(report 
Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
diff --git a/test/integration/standalone/schema/schema_suite_test.go 
b/test/integration/standalone/schema/schema_suite_test.go
new file mode 100644
index 000000000..f41655901
--- /dev/null
+++ b/test/integration/standalone/schema/schema_suite_test.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_schema_test
+
+import (
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       casesschema "github.com/apache/skywalking-banyandb/test/cases/schema"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestSchemaDeletion(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Standalone Schema Deletion Suite", 
Label(integration_standalone.Labels...))
+}
+
+var (
+       connection *grpc.ClientConn
+       deferFunc  func()
+       goods      []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       pool.EnableStackTracking(true)
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       addr, _, closeFn := setup.EmptyStandalone()
+       deferFunc = closeFn
+       return []byte(addr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesschema.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   time.Now(),
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Standalone Schema Deletion Suite", func(report 
Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})


Reply via email to