This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch replace-physical-property-deletion in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 26d382a4d6b1b6a2b90c973972c400db57528f3d Author: mrproliu <741550...@qq.com> AuthorDate: Thu Jun 12 16:37:46 2025 +0800 Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database --- CHANGES.md | 1 + api/proto/banyandb/property/v1/rpc.proto | 3 + banyand/liaison/grpc/property.go | 120 +++++++++------- banyand/liaison/grpc/server.go | 9 +- banyand/property/db.go | 13 +- banyand/property/listener.go | 12 +- banyand/property/property.go | 18 ++- banyand/property/shard.go | 84 ++++++++--- docs/api-reference.md | 1 + pkg/convert/number.go | 15 ++ pkg/convert/number_test.go | 19 +++ pkg/index/index.go | 1 + pkg/index/inverted/inverted.go | 4 + pkg/index/inverted/inverted_series.go | 3 + pkg/test/helpers/constant.go | 10 ++ pkg/test/property/etcd.go | 97 +++++++++++++ pkg/test/property/testdata/group.json | 10 ++ pkg/test/property/testdata/property/ui_menu.json | 21 +++ pkg/test/setup/setup.go | 7 + test/cases/property/data/data.go | 155 +++++++++++++++++++++ .../property/data/input/after_delete_first.yaml | 19 +++ test/cases/property/data/input/create.yaml | 32 +++++ test/cases/property/data/input/create_other.yaml | 32 +++++ test/cases/property/data/input/delete_first.yaml | 20 +++ test/cases/property/data/input/update_value.yaml | 32 +++++ .../property/data/want/after_delete_first.yaml | 31 +++++ test/cases/property/data/want/create.yaml | 31 +++++ test/cases/property/data/want/create_other.yaml | 31 +++++ test/cases/property/data/want/delete_first.yaml | 18 +++ test/cases/property/data/want/update_value.yaml | 31 +++++ test/cases/property/property.go | 78 +++++++++++ .../standalone/query/query_suite_test.go | 5 + 32 files changed, 886 insertions(+), 77 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e819a022..43cc1e2b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ Release Notes. - Replica: Support configurable replica count on Group. - Replica: Move the TopN pre-calculation flow from the Data Node to the Liaison Node. - Add a wait and retry to write handlers to avoid the local metadata cache being loaded. +- Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database. ### Bug Fixes diff --git a/api/proto/banyandb/property/v1/rpc.proto b/api/proto/banyandb/property/v1/rpc.proto index 8f70291b..4ea60cef 100644 --- a/api/proto/banyandb/property/v1/rpc.proto +++ b/api/proto/banyandb/property/v1/rpc.proto @@ -122,4 +122,7 @@ message InternalDeleteRequest { message InternalQueryResponse { repeated bytes sources = 1; common.v1.Trace trace = 2; + // deletes indicates the property is deleted or not + // it's mapping to the sources in the same order + repeated bool deletes = 3; } diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index b26569be..c9b92a7d 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -37,8 +37,6 @@ package grpc import ( "context" "math" - "strconv" - "strings" "time" "github.com/pkg/errors" @@ -54,6 +52,7 @@ import ( propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + propertypkg "github.com/apache/skywalking-banyandb/banyand/property" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" @@ -67,6 +66,7 @@ const defaultQueryTimeout = 10 * time.Second type propertyServer struct { propertyv1.UnimplementedPropertyServiceServer + *discoveryService schemaRegistry metadata.Repo pipeline queue.Client nodeRegistry NodeRegistry @@ -150,38 +150,37 @@ func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyReques var prev *propertyv1.Property if len(qResp.Properties) > 0 { prev = qResp.Properties[0] - defer func() { - if err == nil { - var ids [][]byte - for _, p := range qResp.Properties { - ids = append(ids, getPropertyID(p)) - } - if err = ps.remove(ids); err != nil { - err = multierr.Append(err, errors.New("fail to remove old properties")) - } - } - }() + // TODO(mrproliu) find older property by different version, should be remove it } - entity := getEntity(property) + entity := propertypkg.GetEntity(property) id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) if err != nil { return nil, err } - node, err := ps.nodeRegistry.Locate(g, entity, uint32(id), 0) - if err != nil { - return nil, err + copies, ok := ps.groupRepo.copies(property.Metadata.GetGroup()) + if !ok { + return nil, errors.New("failed to get group copies") + } + + nodes := make([]string, 0, copies) + for i := range copies { + nodeID, err := ps.nodeRegistry.Locate(property.GetMetadata().GetGroup(), property.GetMetadata().GetName(), uint32(id), i) + if err != nil { + return nil, err + } + nodes = append(nodes, nodeID) } if req.Strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE { - return ps.replaceProperty(ctx, start, uint64(id), node, prev, property) + return ps.replaceProperty(ctx, start, uint64(id), nodes, prev, property) } - return ps.mergeProperty(ctx, start, uint64(id), node, prev, property) + return ps.mergeProperty(ctx, start, uint64(id), nodes, prev, property) } -func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shardID uint64, node string, +func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shardID uint64, nodes []string, prev, cur *propertyv1.Property, ) (*propertyv1.ApplyResponse, error) { if prev == nil { - return ps.replaceProperty(ctx, now, shardID, node, prev, cur) + return ps.replaceProperty(ctx, now, shardID, nodes, prev, cur) } tagCount, err := tagLen(prev) if err != nil { @@ -202,7 +201,7 @@ func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, shar } } cur.Tags = append(cur.Tags, tags...) - return ps.replaceProperty(ctx, now, shardID, node, prev, cur) + return ps.replaceProperty(ctx, now, shardID, nodes, prev, cur) } func tagLen(property *propertyv1.Property) (uint32, error) { @@ -214,7 +213,7 @@ func tagLen(property *propertyv1.Property) (uint32, error) { return tagsNum, nil } -func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, shardID uint64, node string, +func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, shardID uint64, nodes []string, prev, cur *propertyv1.Property, ) (*propertyv1.ApplyResponse, error) { ns := now.UnixNano() @@ -225,17 +224,38 @@ func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, sh } cur.Metadata.ModRevision = ns cur.UpdatedAt = timestamppb.New(now) - f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, &propertyv1.InternalUpdateRequest{ + req := &propertyv1.InternalUpdateRequest{ ShardId: shardID, - Id: getPropertyID(cur), + Id: propertypkg.GetPropertyID(cur), Property: cur, - })) - if err != nil { - return nil, err } - if _, err := f.Get(); err != nil { - return nil, err + futures := make([]bus.Future, 0, len(nodes)) + for _, node := range nodes { + f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, + bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, req)) + if err != nil { + return nil, errors.Wrapf(err, "failed to publish property update to node %s", node) + } + futures = append(futures, f) + } + // Wait for all futures to complete, and which should last have one success + haveSuccess := false + var lastestError error + for _, f := range futures { + _, err := f.Get() + if err == nil { + haveSuccess = true + } else { + lastestError = multierr.Append(lastestError, err) + } + } + if !haveSuccess { + if lastestError != nil { + return nil, lastestError + } + return nil, errors.New("failed to apply property, no replicas success") } + return &propertyv1.ApplyResponse{ Created: prev == nil, TagsNum: uint32(len(cur.Tags)), @@ -275,7 +295,7 @@ func (ps *propertyServer) Delete(ctx context.Context, req *propertyv1.DeleteRequ } var ids [][]byte for _, p := range qResp.Properties { - ids = append(ids, getPropertyID(p)) + ids = append(ids, propertypkg.GetPropertyID(p)) } if err := ps.remove(ids); err != nil { return nil, err @@ -324,7 +344,7 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques if err != nil { return nil, err } - res := make(map[string]*propertyv1.Property) + res := make(map[string]*propertyWithMetadata) for _, f := range ff { if m, getErr := f.Get(); getErr != nil { err = multierr.Append(err, getErr) @@ -335,24 +355,29 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques } switch v := d.(type) { case *propertyv1.InternalQueryResponse: - for _, s := range v.Sources { + for i, s := range v.Sources { var p propertyv1.Property + var deleted bool err = protojson.Unmarshal(s, &p) if err != nil { return nil, err } - entity := getEntity(&p) + if i < len(v.Deletes) { + deleted = v.Deletes[i] + } + entity := propertypkg.GetEntity(&p) cur, ok := res[entity] + property := &propertyWithMetadata{ + Property: &p, + deleted: deleted, + } if !ok { - res[entity] = &p + res[entity] = property continue } if cur.Metadata.ModRevision < p.Metadata.ModRevision { - res[entity] = &p - err = ps.remove([][]byte{getPropertyID(cur)}) - if err != nil { - return nil, err - } + res[entity] = property + // TODO(mrproliu) handle the case where the property detected multiple versions } } if span != nil { @@ -371,6 +396,10 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques } properties := make([]*propertyv1.Property, 0, len(res)) for _, p := range res { + // ignore deleted property + if p.deleted { + continue + } if len(req.TagProjection) > 0 { var tags []*modelv1.Tag for _, tag := range p.Tags { @@ -383,7 +412,7 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques } p.Tags = tags } - properties = append(properties, p) + properties = append(properties, p.Property) if len(properties) >= int(req.Limit) { break } @@ -406,10 +435,7 @@ func (ps *propertyServer) remove(ids [][]byte) error { return nil } -func getPropertyID(prop *propertyv1.Property) []byte { - return convert.StringToBytes(getEntity(prop) + "/" + strconv.FormatInt(prop.Metadata.ModRevision, 10)) -} - -func getEntity(prop *propertyv1.Property) string { - return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, prop.Id}, "/") +type propertyWithMetadata struct { + *propertyv1.Property + deleted bool } diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 00c384c4..b12d1839 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -147,9 +147,10 @@ func NewServer(_ context.Context, pipeline, broadcaster queue.Client, topNPipeli schemaRegistry: schemaRegistry, }, propertyServer: &propertyServer{ - schemaRegistry: schemaRegistry, - pipeline: pipeline, - nodeRegistry: nr.PropertyNodeRegistry, + schemaRegistry: schemaRegistry, + pipeline: pipeline, + nodeRegistry: nr.PropertyNodeRegistry, + discoveryService: newDiscoveryService(schema.KindProperty, schemaRegistry, nr.MeasureNodeRegistry), }, propertyRegistryServer: &propertyRegistryServer{ schemaRegistry: schemaRegistry, @@ -164,9 +165,11 @@ func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") s.streamSVC.setLogger(s.log) s.measureSVC.setLogger(s.log) + s.propertyServer.SetLogger(s.log) components := []*discoveryService{ s.streamSVC.discoveryService, s.measureSVC.discoveryService, + s.propertyServer.discoveryService, } for _, c := range components { c.SetLogger(s.log) diff --git a/banyand/property/db.go b/banyand/property/db.go index 8cc28629..34c552e1 100644 --- a/banyand/property/db.go +++ b/banyand/property/db.go @@ -111,19 +111,19 @@ func (db *database) update(ctx context.Context, shardID common.ShardID, id []byt return nil } -func (db *database) delete(docIDs [][]byte) error { +func (db *database) delete(ctx context.Context, docIDs [][]byte) error { sLst := db.sLst.Load() if sLst == nil { return nil } var err error for _, s := range *sLst { - multierr.AppendInto(&err, s.delete(docIDs)) + multierr.AppendInto(&err, s.delete(ctx, docIDs)) } return err } -func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([][]byte, error) { +func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([]*queryProperty, error) { iq, err := inverted.BuildPropertyQuery(req, groupField, entityID) if err != nil { return nil, err @@ -132,7 +132,7 @@ func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([] if sLst == nil { return nil, nil } - var res [][]byte + var res []*queryProperty for _, s := range *sLst { r, err := s.search(ctx, iq, int(req.Limit)) if err != nil { @@ -224,3 +224,8 @@ func walkDir(root, prefix string, wf walkFn) error { } return nil } + +type queryProperty struct { + source []byte + deleted bool +} diff --git a/banyand/property/listener.go b/banyand/property/listener.go index f02b7ac2..f204816f 100644 --- a/banyand/property/listener.go +++ b/banyand/property/listener.go @@ -110,7 +110,7 @@ type deleteListener struct { s *service } -func (h *deleteListener) Rev(_ context.Context, message bus.Message) (resp bus.Message) { +func (h *deleteListener) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { n := time.Now() now := n.UnixNano() var protoReq proto.Message @@ -130,7 +130,7 @@ func (h *deleteListener) Rev(_ context.Context, message bus.Message) (resp bus.M resp = bus.NewMessage(bus.MessageID(now), common.NewError("id is empty")) return } - err := h.s.db.delete(d.Ids) + err := h.s.db.delete(ctx, d.Ids) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to delete property: %v", err)) return @@ -180,7 +180,7 @@ func (h *queryListener) Rev(ctx context.Context, message bus.Message) (resp bus. span.Stop() }() } - sources, err := h.s.db.query(ctx, d) + properties, err := h.s.db.query(ctx, d) if err != nil { if tracer != nil { span.Error(err) @@ -192,8 +192,10 @@ func (h *queryListener) Rev(ctx context.Context, message bus.Message) (resp bus. resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to query property: %v", err)) return } - qResp := &propertyv1.InternalQueryResponse{ - Sources: sources, + qResp := &propertyv1.InternalQueryResponse{} + for _, p := range properties { + qResp.Sources = append(qResp.Sources, p.source) + qResp.Deletes = append(qResp.Deletes, p.deleted) } if tracer != nil { qResp.Trace = tracer.ToProto() diff --git a/banyand/property/property.go b/banyand/property/property.go index 59602ff4..77fdd3ec 100644 --- a/banyand/property/property.go +++ b/banyand/property/property.go @@ -18,7 +18,15 @@ // Package property provides the property service interface. package property -import "github.com/apache/skywalking-banyandb/pkg/run" +import ( + "strconv" + "strings" + + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/run" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" +) // Service is the interface for property service. type Service interface { @@ -26,3 +34,11 @@ type Service interface { run.Config run.Service } + +func GetPropertyID(prop *propertyv1.Property) []byte { + return convert.StringToBytes(GetEntity(prop) + "/" + strconv.FormatInt(prop.Metadata.ModRevision, 10)) +} + +func GetEntity(prop *propertyv1.Property) string { + return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, prop.Id}, "/") +} diff --git a/banyand/property/shard.go b/banyand/property/shard.go index 3fd8d3bd..540c3674 100644 --- a/banyand/property/shard.go +++ b/banyand/property/shard.go @@ -22,11 +22,11 @@ import ( "fmt" "path" "strconv" + "time" - "google.golang.org/protobuf/encoding/protojson" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/api/common" - propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/meter" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" + "google.golang.org/protobuf/encoding/protojson" ) const ( @@ -42,14 +43,16 @@ const ( groupField = "_group" nameField = index.IndexModeName entityID = "_entity_id" + deleteField = "_deleted" ) var ( - sourceFieldKey = index.FieldKey{TagName: sourceField} - entityFieldKey = index.FieldKey{TagName: entityID} - groupFieldKey = index.FieldKey{TagName: groupField} - nameFieldKey = index.FieldKey{TagName: nameField} - projection = []index.FieldKey{sourceFieldKey} + sourceFieldKey = index.FieldKey{TagName: sourceField} + entityFieldKey = index.FieldKey{TagName: entityID} + groupFieldKey = index.FieldKey{TagName: groupField} + nameFieldKey = index.FieldKey{TagName: nameField} + deletedFieldKey = index.FieldKey{TagName: deleteField} + projection = []index.FieldKey{sourceFieldKey, deletedFieldKey} ) type shard struct { @@ -89,9 +92,19 @@ func (db *database) newShard(ctx context.Context, id common.ShardID, flushTimeou } func (s *shard) update(id []byte, property *propertyv1.Property) error { + document, err := s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update document failure: %v", err) + } + return s.store.UpdateSeriesBatch(index.Batch{ + Documents: index.Documents{*document}, + }) +} + +func (s *shard) buildUpdateDocument(id []byte, property *propertyv1.Property) (*index.Document, error) { pj, err := protojson.Marshal(property) if err != nil { - return err + return nil, err } sourceField := index.NewBytesField(sourceFieldKey, pj) sourceField.NoSort = true @@ -110,24 +123,56 @@ func (s *shard) update(id []byte, property *propertyv1.Property) error { for _, t := range property.Tags { tv, err := pbv1.MarshalTagValue(t.Value) if err != nil { - return err + return nil, err } tagField := index.NewBytesField(index.FieldKey{IndexRuleID: uint32(convert.HashStr(t.Key))}, tv) tagField.Index = true tagField.NoSort = true doc.Fields = append(doc.Fields, tagField) } - return s.store.UpdateSeriesBatch(index.Batch{ - Documents: index.Documents{doc}, - }) + return &doc, nil } -func (s *shard) delete(docID [][]byte) error { - return s.store.Delete(docID) +func (s *shard) delete(ctx context.Context, docID [][]byte) error { + // search the original documents by docID + seriesMatchers := make([]index.SeriesMatcher, 0, len(docID)) + for _, id := range docID { + seriesMatchers = append(seriesMatchers, index.SeriesMatcher{ + Match: id, + Type: index.SeriesMatcherTypeExact, + }) + } + iq, err := s.store.BuildQuery(seriesMatchers, nil, nil) + if err != nil { + return fmt.Errorf("build property query failure: %v", err) + } + exisingDocList, err := s.search(ctx, iq, len(docID)) + if err != nil { + return fmt.Errorf("search existing documents failure: %v", err) + } + removeDocList := make(index.Documents, 0, len(exisingDocList)) + for _, property := range exisingDocList { + p := &propertyv1.Property{} + if err := protojson.Unmarshal(property.source, p); err != nil { + return fmt.Errorf("unmarshal property failure: %v", err) + } + // update the property metadata to mark it as deleted + p.Metadata.ModRevision = time.Now().UnixNano() + document, err := s.buildUpdateDocument(GetPropertyID(p), p) + if err != nil { + return fmt.Errorf("build delete document failure: %v", err) + } + // mark the document as deleted + document.Deleted = true + removeDocList = append(removeDocList, *document) + } + return s.store.UpdateSeriesBatch(index.Batch{ + Documents: removeDocList, + }) } func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, -) (data [][]byte, err error) { +) (data []*queryProperty, err error) { tracer := query.GetTracer(ctx) if tracer != nil { span, _ := tracer.StartSpan(ctx, "property.search") @@ -150,9 +195,14 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, if len(ss) == 0 { return nil, nil } - data = make([][]byte, 0, len(ss)) + data = make([]*queryProperty, 0, len(ss)) for _, s := range ss { - data = append(data, s.Fields[sourceField]) + bytes := s.Fields[sourceField] + deleted := convert.BytesToBool(s.Fields[deleteField]) + data = append(data, &queryProperty{ + source: bytes, + deleted: deleted, + }) } return data, nil } diff --git a/docs/api-reference.md b/docs/api-reference.md index 40f72f7b..72185c0d 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -3401,6 +3401,7 @@ Property stores the user defined data | ----- | ---- | ----- | ----------- | | sources | [bytes](#bytes) | repeated | | | trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) | | | +| deletes | [bool](#bool) | repeated | deletes indicates the property is deleted or not it's mapping to the sources in the same order | diff --git a/pkg/convert/number.go b/pkg/convert/number.go index 0074c588..175e0f9a 100644 --- a/pkg/convert/number.go +++ b/pkg/convert/number.go @@ -51,6 +51,14 @@ func Uint32ToBytes(u uint32) []byte { return bs } +// BoolToBytes converts bool to bytes. +func BoolToBytes(b bool) []byte { + if b { + return []byte{1} + } + return []byte{0} +} + // BytesToInt64 converts bytes to int64. func BytesToInt64(b []byte) int64 { u := binary.BigEndian.Uint64(b) @@ -87,3 +95,10 @@ func Float64ToBytes(f float64) []byte { func BytesToFloat64(b []byte) float64 { return math.Float64frombits(binary.BigEndian.Uint64(b)) } + +func BytesToBool(b []byte) bool { + if len(b) == 0 { + return false + } + return b[0] != 0 +} diff --git a/pkg/convert/number_test.go b/pkg/convert/number_test.go index 9fcd8473..1a5b310c 100644 --- a/pkg/convert/number_test.go +++ b/pkg/convert/number_test.go @@ -46,3 +46,22 @@ func TestInt64ToBytes(t *testing.T) { }) } } + +func TestBoolToBytes(t *testing.T) { + testCases := []struct { + expected []byte + input bool + }{ + {[]byte{1}, true}, + {[]byte{0}, false}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("BoolToBytes(%t)", tc.input), func(t *testing.T) { + result := BoolToBytes(tc.input) + if !bytes.Equal(result, tc.expected) { + t.Errorf("Expected %v, got %v", tc.expected, result) + } + }) + } +} diff --git a/pkg/index/index.go b/pkg/index/index.go index e13334b2..62273996 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -300,6 +300,7 @@ type Document struct { Timestamp int64 DocID uint64 Version int64 + Deleted bool // for logical deletion } // Documents is a collection of documents. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 9ac005af..8450e647 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -52,6 +52,7 @@ const ( timestampField = "_timestamp" versionField = "_version" sourceField = "_source" + deletedField = "_deleted" ) var ( @@ -141,6 +142,9 @@ func (s *store) Batch(batch index.Batch) error { if d.Timestamp > 0 { doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue()) } + if d.Deleted { + doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.BoolToBytes(true)).StoreValue()) + } b.Insert(doc) } return s.writer.Batch(b) diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 93c95c88..b904b702 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -121,6 +121,9 @@ func toDoc(d index.Document, toParseFieldNames bool) (*bluge.Document, []string) vf := bluge.NewStoredOnlyField(versionField, convert.Int64ToBytes(d.Version)) doc.AddField(vf) } + if d.Deleted { + doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.BoolToBytes(d.Deleted)).StoreValue()) + } return doc, fieldNames } diff --git a/pkg/test/helpers/constant.go b/pkg/test/helpers/constant.go index c0bf699c..d45a3294 100644 --- a/pkg/test/helpers/constant.go +++ b/pkg/test/helpers/constant.go @@ -37,6 +37,15 @@ type SharedContext struct { BaseTime time.Time } +type TestMode int + +const ( + TestModeQuery TestMode = iota + TestModeCreate + TestModeUpdate + TestModeDelete +) + // Args is a wrapper seals all necessary info for table specs. type Args struct { Begin *timestamppb.Timestamp @@ -50,6 +59,7 @@ type Args struct { WantErr bool DisOrder bool IgnoreElementID bool + Mode TestMode } // UnmarshalYAML decodes YAML raw bytes to proto.Message. diff --git a/pkg/test/property/etcd.go b/pkg/test/property/etcd.go new file mode 100644 index 00000000..05f2aa75 --- /dev/null +++ b/pkg/test/property/etcd.go @@ -0,0 +1,97 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "embed" + "encoding/json" + "errors" + "path" + + "google.golang.org/protobuf/encoding/protojson" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +const ( + propertyDir = "testdata/property" +) + +var ( + //go:embed testdata/property/*.json + propertyStore embed.FS + //go:embed testdata/group.json + groupJSON string +) + +// loadSchemas loads streams, index rules, and index rule bindings. +func loadSchemas(ctx context.Context, e schema.Registry) error { + streams, err := propertyStore.ReadDir(propertyDir) + if err != nil { + return err + } + var data []byte + for _, entry := range streams { + data, err = propertyStore.ReadFile(path.Join(propertyDir, entry.Name())) + if err != nil { + return err + } + var property databasev1.Property + err = protojson.Unmarshal(data, &property) + if err != nil { + return err + } + if innerErr := e.CreateProperty(ctx, &property); innerErr != nil { + return innerErr + } + } + + return nil +} + +// PreloadSchema loads schemas from files in the booting process. +// This version loads group without stages. +func PreloadSchema(ctx context.Context, e schema.Registry) error { + if e == nil { + return nil + } + var rawGroups []json.RawMessage + if err := json.Unmarshal([]byte(groupJSON), &rawGroups); err != nil { + return err + } + for _, raw := range rawGroups { + g := &commonv1.Group{} + if err := protojson.Unmarshal(raw, g); err != nil { + return err + } + _, err := e.GetGroup(ctx, g.Metadata.Name) + if !errors.Is(err, schema.ErrGRPCResourceNotFound) { + logger.Infof("group %s already exists", g.Metadata.Name) + return nil + } + if innerErr := e.CreateGroup(ctx, g); innerErr != nil { + return innerErr + } + } + + return loadSchemas(ctx, e) +} diff --git a/pkg/test/property/testdata/group.json b/pkg/test/property/testdata/group.json new file mode 100644 index 00000000..5dfe7c32 --- /dev/null +++ b/pkg/test/property/testdata/group.json @@ -0,0 +1,10 @@ +[{ + "metadata": { + "name": "sw_property" + }, + "catalog": "CATALOG_PROPERTY", + "resource_opts": { + "shard_num": 2 + }, + "updated_at": "2021-04-15T01:30:15.01Z" +}] \ No newline at end of file diff --git a/pkg/test/property/testdata/property/ui_menu.json b/pkg/test/property/testdata/property/ui_menu.json new file mode 100644 index 00000000..45d69c75 --- /dev/null +++ b/pkg/test/property/testdata/property/ui_menu.json @@ -0,0 +1,21 @@ +{ + "metadata": { + "group": "sw_property", + "name": "ui_menu" + }, + "tags": [ + { + "name": "menu_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "configuration", + "type": "TAG_TYPE_STRING" + }, + { + "name": "update_time", + "type": "TAG_TYPE_INT" + } + ], + "updated_at": "2024-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 53f87cad..9c5cd598 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -38,6 +38,7 @@ import ( testflags "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" + test_property "github.com/apache/skywalking-banyandb/pkg/test/property" test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" ) @@ -48,6 +49,7 @@ func Standalone(flags ...string) (string, string, func()) { return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "property"}, }, "", "", flags...) } @@ -56,6 +58,7 @@ func StandaloneWithTLS(certFile, keyFile string, flags ...string) (string, strin return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "property"}, }, certFile, keyFile, flags...) } @@ -83,6 +86,7 @@ func ClosableStandalone(path string, ports []int, flags ...string) (string, stri return standaloneServer(path, ports, []SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "property"}, }, "", "", flags...) } @@ -178,6 +182,9 @@ func (p *preloadService) PreRun(ctx context.Context) error { if p.name == "stream" { return test_stream.PreloadSchema(ctx, p.registry) } + if p.name == "property" { + return test_property.PreloadSchema(ctx, p.registry) + } return test_measure.PreloadSchema(ctx, p.registry) } diff --git a/test/cases/property/data/data.go b/test/cases/property/data/data.go new file mode 100644 index 00000000..e99a4ceb --- /dev/null +++ b/test/cases/property/data/data.go @@ -0,0 +1,155 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package data + +import ( + "context" + "embed" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + + "github.com/google/go-cmp/cmp" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/testing/protocmp" + + "sigs.k8s.io/yaml" +) + +//go:embed input/*.yaml +var inputFS embed.FS + +//go:embed want/*.yaml +var wantFS embed.FS + +// VerifyFn verify whether the query response matches the wanted result. +var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) { + i, err := inputFS.ReadFile("input/" + args.Input + ".yaml") + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + if args.Want == "" { + args.Want = args.Input + } + ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml") + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + + switch args.Mode { + case helpers.TestModeCreate: + updateVerifier(i, ww, innerGm, sharedContext, args, true) + case helpers.TestModeUpdate: + updateVerifier(i, ww, innerGm, sharedContext, args, false) + case helpers.TestModeDelete: + deleteVerifier(i, ww, innerGm, sharedContext, args) + case helpers.TestModeQuery: + queryVerifier(i, ww, innerGm, sharedContext, args) + } +} + +func deleteVerifier(input []byte, want []byte, innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) { + del := &propertyv1.DeleteRequest{} + helpers.UnmarshalYAML(input, del) + c := propertyv1.NewPropertyServiceClient(sharedContext.Connection) + ctx := context.Background() + resp, err := c.Delete(ctx, del) + if args.WantErr { + if err == nil { + g.Fail("expected error") + } + return + } + innerGm.Expect(resp.Deleted).To(gm.BeTrue()) + queryRequest := &propertyv1.QueryRequest{ + Groups: []string{del.Group}, + Name: del.Name, + Ids: []string{del.Id}, + } + verifyQuery(want, ctx, c, queryRequest, args, innerGm) +} + +func queryVerifier(input, want []byte, innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) { + queryRequest := &propertyv1.QueryRequest{} + helpers.UnmarshalYAML(input, queryRequest) + c := propertyv1.NewPropertyServiceClient(sharedContext.Connection) + ctx := context.Background() + verifyQuery(want, ctx, c, queryRequest, args, innerGm) +} + +func updateVerifier(input, want []byte, innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args, create bool) { + apply := &propertyv1.ApplyRequest{} + helpers.UnmarshalYAML(input, apply) + c := propertyv1.NewPropertyServiceClient(sharedContext.Connection) + ctx := context.Background() + resp, err := c.Apply(ctx, apply) + if args.WantErr { + if err == nil { + g.Fail("expected error") + } + return + } + gm.Expect(err).NotTo(gm.HaveOccurred()) + gm.Expect(resp.Created).To(gm.Equal(create)) + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + queryRequest := &propertyv1.QueryRequest{ + Groups: []string{apply.Property.Metadata.Group}, + Name: apply.Property.Metadata.Name, + Ids: []string{apply.Property.Id}, + } + if !verifyQuery(want, ctx, c, queryRequest, args, innerGm) { + return + } + queryRequest.Trace = true + q, err := c.Query(ctx, queryRequest) + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + innerGm.Expect(q.Trace).NotTo(gm.BeNil()) + innerGm.Expect(q.Trace.GetSpans()).NotTo(gm.BeEmpty()) +} + +func verifyQuery(want []byte, ctx context.Context, client propertyv1.PropertyServiceClient, + queryRequest *propertyv1.QueryRequest, args helpers.Args, innerGm gm.Gomega) bool { + query, err := client.Query(ctx, queryRequest) + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + if args.WantEmpty { + innerGm.Expect(query.Properties).To(gm.BeEmpty()) + return true + } + wantResp := &propertyv1.QueryResponse{} + helpers.UnmarshalYAML(want, wantResp) + success := innerGm.Expect(cmp.Equal(query, wantResp, + protocmp.IgnoreUnknown(), + protocmp.IgnoreFields(&propertyv1.Property{}, "updated_at"), + protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"), + protocmp.Transform())). + To(gm.BeTrue(), func() string { + var j []byte + j, err = protojson.Marshal(query) + if err != nil { + return err.Error() + } + var y []byte + y, err = yaml.JSONToYAML(j) + if err != nil { + return err.Error() + } + return string(y) + }) + return success +} diff --git a/test/cases/property/data/input/after_delete_first.yaml b/test/cases/property/data/input/after_delete_first.yaml new file mode 100644 index 00000000..65485ed0 --- /dev/null +++ b/test/cases/property/data/input/after_delete_first.yaml @@ -0,0 +1,19 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +groups: ["sw_property"] +name: "ui_menu" diff --git a/test/cases/property/data/input/create.yaml b/test/cases/property/data/input/create.yaml new file mode 100644 index 00000000..68bef98e --- /dev/null +++ b/test/cases/property/data/input/create.yaml @@ -0,0 +1,32 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +property: + metadata: + group: "sw_property" + name: "ui_menu" + id: "1" + tags: + - key: "configuration" + value: + str: + value: "[{\"test\": \"test\"}]" + - key: "update_time" + value: + int: + value: 1749613769631 +strategy: "STRATEGY_MERGE" \ No newline at end of file diff --git a/test/cases/property/data/input/create_other.yaml b/test/cases/property/data/input/create_other.yaml new file mode 100644 index 00000000..2ab8668d --- /dev/null +++ b/test/cases/property/data/input/create_other.yaml @@ -0,0 +1,32 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +property: + metadata: + group: "sw_property" + name: "ui_menu" + id: "2" + tags: + - key: "configuration" + value: + str: + value: "[{\"test\": \"abc\"}]" + - key: "update_time" + value: + int: + value: 1749613769784 +strategy: "STRATEGY_MERGE" \ No newline at end of file diff --git a/test/cases/property/data/input/delete_first.yaml b/test/cases/property/data/input/delete_first.yaml new file mode 100644 index 00000000..d3c3f24a --- /dev/null +++ b/test/cases/property/data/input/delete_first.yaml @@ -0,0 +1,20 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +group: "sw_property" +name: "ui_menu" +id: "1" \ No newline at end of file diff --git a/test/cases/property/data/input/update_value.yaml b/test/cases/property/data/input/update_value.yaml new file mode 100644 index 00000000..3b554ad1 --- /dev/null +++ b/test/cases/property/data/input/update_value.yaml @@ -0,0 +1,32 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +property: + metadata: + group: "sw_property" + name: "ui_menu" + id: "1" + tags: + - key: "configuration" + value: + str: + value: "[{\"test\": \"test123456\"}]" + - key: "update_time" + value: + int: + value: 1749613769975 +strategy: "STRATEGY_MERGE" \ No newline at end of file diff --git a/test/cases/property/data/want/after_delete_first.yaml b/test/cases/property/data/want/after_delete_first.yaml new file mode 100644 index 00000000..63d8801b --- /dev/null +++ b/test/cases/property/data/want/after_delete_first.yaml @@ -0,0 +1,31 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +properties: + - metadata: + group: "sw_property" + name: "ui_menu" + id: "2" + tags: + - key: "configuration" + value: + str: + value: '[{"test": "abc"}]' + - key: "update_time" + value: + int: + value: 1749613769784 \ No newline at end of file diff --git a/test/cases/property/data/want/create.yaml b/test/cases/property/data/want/create.yaml new file mode 100644 index 00000000..31180170 --- /dev/null +++ b/test/cases/property/data/want/create.yaml @@ -0,0 +1,31 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +properties: + - metadata: + group: "sw_property" + name: "ui_menu" + id: "1" + tags: + - key: "configuration" + value: + str: + value: '[{"test": "test"}]' + - key: "update_time" + value: + int: + value: 1749613769631 \ No newline at end of file diff --git a/test/cases/property/data/want/create_other.yaml b/test/cases/property/data/want/create_other.yaml new file mode 100644 index 00000000..63d8801b --- /dev/null +++ b/test/cases/property/data/want/create_other.yaml @@ -0,0 +1,31 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +properties: + - metadata: + group: "sw_property" + name: "ui_menu" + id: "2" + tags: + - key: "configuration" + value: + str: + value: '[{"test": "abc"}]' + - key: "update_time" + value: + int: + value: 1749613769784 \ No newline at end of file diff --git a/test/cases/property/data/want/delete_first.yaml b/test/cases/property/data/want/delete_first.yaml new file mode 100644 index 00000000..a2b46d56 --- /dev/null +++ b/test/cases/property/data/want/delete_first.yaml @@ -0,0 +1,18 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +properties: [] \ No newline at end of file diff --git a/test/cases/property/data/want/update_value.yaml b/test/cases/property/data/want/update_value.yaml new file mode 100644 index 00000000..035104b8 --- /dev/null +++ b/test/cases/property/data/want/update_value.yaml @@ -0,0 +1,31 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +properties: + - metadata: + group: "sw_property" + name: "ui_menu" + id: "1" + tags: + - key: "configuration" + value: + str: + value: '[{"test": "test123456"}]' + - key: "update_time" + value: + int: + value: 1749613769975 \ No newline at end of file diff --git a/test/cases/property/property.go b/test/cases/property/property.go new file mode 100644 index 00000000..328e8e35 --- /dev/null +++ b/test/cases/property/property.go @@ -0,0 +1,78 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property_test + +import ( + "time" + + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + propertyTestData "github.com/apache/skywalking-banyandb/test/cases/property/data" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" +) + +var ( + // SharedContext is the parallel execution context. + SharedContext helpers.SharedContext + verify = func(args helpers.Args) { + gm.Eventually(func(innerGm gm.Gomega) { + propertyTestData.VerifyFn(innerGm, SharedContext, args) + }, flags.EventuallyTimeout).WithTimeout(10 * time.Second).WithPolling(2 * time.Second).Should(gm.Succeed()) + } +) + +var _ = g.Describe("Property Tests", func() { + g.It("property lifecycle", func() { + for _, entry := range []Entry{ + { + Desc: "create new property", + Args: helpers.Args{Input: "create", Mode: helpers.TestModeCreate}, + }, + { + Desc: "update first property", + Args: helpers.Args{Input: "create", Mode: helpers.TestModeUpdate}, + }, + { + Desc: "update first property with different value", + Args: helpers.Args{Input: "update_value", Mode: helpers.TestModeUpdate}, + }, + { + Desc: "create second property", + Args: helpers.Args{Input: "create_other", Mode: helpers.TestModeCreate}, + }, + { + Desc: "delete first property", + Args: helpers.Args{Input: "delete_first", Mode: helpers.TestModeDelete}, + }, + { + Desc: "select all after delete first property", + Args: helpers.Args{Input: "after_delete_first", Mode: helpers.TestModeQuery}, + }, + } { + g.By(entry.Desc) + verify(entry.Args) + } + }) +}) + +type Entry struct { + Desc string + Args helpers.Args +} diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index 7c576550..379463f3 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -37,6 +37,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" test_cases "github.com/apache/skywalking-banyandb/test/cases" casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casesproperty "github.com/apache/skywalking-banyandb/test/cases/property" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" @@ -82,6 +83,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: now, } + casesproperty.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } Expect(err).NotTo(HaveOccurred()) })