This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 9552688d Add mod revision check to write requests(measure/stream) (#322) 9552688d is described below commit 9552688d60f055f6bb539a91464e8b1db77e7309 Author: hailin0 <wanghai...@apache.org> AuthorDate: Fri Sep 15 14:21:32 2023 +0800 Add mod revision check to write requests(measure/stream) (#322) * Add mod revision check to write requests(measure/stream) - Support for create/update schema check mod revision - Support for write data check mod revision - Adapt to web console requests - Update OAP e2e image - Fix query failure caused by schema change --------- Co-authored-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + api/proto/banyandb/database/v1/rpc.proto | 16 ++++-- api/proto/banyandb/measure/v1/write.proto | 12 ++++- api/proto/banyandb/model/v1/write.proto | 33 ++++++++++++ api/proto/banyandb/stream/v1/write.proto | 14 ++++- banyand/liaison/grpc/discovery.go | 15 +++--- banyand/liaison/grpc/measure.go | 26 ++++++--- banyand/liaison/grpc/registry.go | 28 +++++++--- banyand/liaison/grpc/stream.go | 26 ++++++--- banyand/measure/measure_topn.go | 3 +- banyand/measure/measure_write.go | 1 + banyand/measure/metadata.go | 4 +- banyand/measure/metadata_test.go | 4 +- banyand/metadata/schema/etcd.go | 82 +++++++++++++++-------------- banyand/metadata/schema/etcd_test.go | 2 +- banyand/metadata/schema/group.go | 6 ++- banyand/metadata/schema/index.go | 12 +++-- banyand/metadata/schema/measure.go | 15 +++--- banyand/metadata/schema/property.go | 4 +- banyand/metadata/schema/schema.go | 15 +++--- banyand/metadata/schema/shard.go | 5 +- banyand/metadata/schema/stream.go | 13 ++--- banyand/metadata/schema/topn.go | 6 ++- banyand/metadata/schema/watcher_test.go | 8 ++- banyand/stream/metadata_test.go | 4 +- docs/api-reference.md | 73 ++++++++++++++++++++++++- pkg/partition/entity.go | 15 +++--- pkg/query/logical/common.go | 23 +++++--- pkg/test/measure/etcd.go | 3 +- pkg/test/stream/etcd.go | 2 +- test/cases/measure/data/data.go | 18 +++++-- test/cases/stream/data/data.go | 23 +++++--- test/docker/base-compose.yml | 4 +- test/e2e-v2/script/env | 2 +- test/stress/cases/istio/istio_suite_test.go | 1 + test/stress/cases/istio/repo.go | 3 +- test/stress/env | 2 +- test/stress/env.dev | 2 +- ui/src/components/Editor/index.vue | 6 ++- 39 files changed, 382 insertions(+), 150 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0d98483c..58fcd220 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ Release Notes. - Implement the remote queue to spreading data to data nodes. - Fix parse environment variables error - Implement the distributed query engine. +- Add mod revision check to write requests. ### Bugs diff --git a/api/proto/banyandb/database/v1/rpc.proto b/api/proto/banyandb/database/v1/rpc.proto index 1fbccd5d..09ffada3 100644 --- a/api/proto/banyandb/database/v1/rpc.proto +++ b/api/proto/banyandb/database/v1/rpc.proto @@ -32,13 +32,17 @@ message StreamRegistryServiceCreateRequest { banyandb.database.v1.Stream stream = 1; } -message StreamRegistryServiceCreateResponse {} +message StreamRegistryServiceCreateResponse { + int64 mod_revision = 1; +} message StreamRegistryServiceUpdateRequest { banyandb.database.v1.Stream stream = 1; } -message StreamRegistryServiceUpdateResponse {} +message StreamRegistryServiceUpdateResponse { + int64 mod_revision = 1; +} message StreamRegistryServiceDeleteRequest { banyandb.common.v1.Metadata metadata = 1; @@ -260,13 +264,17 @@ message MeasureRegistryServiceCreateRequest { banyandb.database.v1.Measure measure = 1; } -message MeasureRegistryServiceCreateResponse {} +message MeasureRegistryServiceCreateResponse { + int64 mod_revision = 1; +} message MeasureRegistryServiceUpdateRequest { banyandb.database.v1.Measure measure = 1; } -message MeasureRegistryServiceUpdateResponse {} +message MeasureRegistryServiceUpdateResponse { + int64 mod_revision = 1; +} message MeasureRegistryServiceDeleteRequest { banyandb.common.v1.Metadata metadata = 1; diff --git a/api/proto/banyandb/measure/v1/write.proto b/api/proto/banyandb/measure/v1/write.proto index 49a7a004..9d007334 100644 --- a/api/proto/banyandb/measure/v1/write.proto +++ b/api/proto/banyandb/measure/v1/write.proto @@ -21,6 +21,7 @@ package banyandb.measure.v1; import "banyandb/common/v1/common.proto"; import "banyandb/model/v1/common.proto"; +import "banyandb/model/v1/write.proto"; import "google/protobuf/timestamp.proto"; import "validate/validate.proto"; @@ -43,10 +44,19 @@ message WriteRequest { common.v1.Metadata metadata = 1 [(validate.rules).message.required = true]; // the data_point is required. DataPointValue data_point = 2 [(validate.rules).message.required = true]; + // the message_id is required. + uint64 message_id = 3 [(validate.rules).uint64.gt = 0]; } // WriteResponse is the response contract for write -message WriteResponse {} +message WriteResponse { + // the message_id from request. + uint64 message_id = 1 [(validate.rules).uint64.gt = 0]; + // status indicates the request processing result + model.v1.Status status = 2 [(validate.rules).enum.defined_only = true]; + // the metadata from request when request fails + common.v1.Metadata metadata = 3 [(validate.rules).message.required = true]; +} message InternalWriteRequest { uint32 shard_id = 1; diff --git a/api/proto/banyandb/model/v1/write.proto b/api/proto/banyandb/model/v1/write.proto new file mode 100644 index 00000000..83b8c91e --- /dev/null +++ b/api/proto/banyandb/model/v1/write.proto @@ -0,0 +1,33 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package banyandb.model.v1; + +option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"; +option java_package = "org.apache.skywalking.banyandb.model.v1"; + +// Status is the response status for write +enum Status { + STATUS_UNSPECIFIED = 0; + STATUS_SUCCEED = 1; + STATUS_INVALID_TIMESTAMP = 2; + STATUS_NOT_FOUND = 3; + STATUS_EXPIRED_SCHEMA = 4; + STATUS_INTERNAL_ERROR = 5; +} diff --git a/api/proto/banyandb/stream/v1/write.proto b/api/proto/banyandb/stream/v1/write.proto index e177eb11..1c3347f1 100644 --- a/api/proto/banyandb/stream/v1/write.proto +++ b/api/proto/banyandb/stream/v1/write.proto @@ -21,6 +21,7 @@ package banyandb.stream.v1; import "banyandb/common/v1/common.proto"; import "banyandb/model/v1/common.proto"; +import "banyandb/model/v1/write.proto"; import "google/protobuf/timestamp.proto"; import "validate/validate.proto"; @@ -39,13 +40,22 @@ message ElementValue { } message WriteRequest { - // the metadata is only required in the first write. + // the metadata is required. common.v1.Metadata metadata = 1 [(validate.rules).message.required = true]; // the element is required. ElementValue element = 2 [(validate.rules).message.required = true]; + // the message_id is required. + uint64 message_id = 3 [(validate.rules).uint64.gt = 0]; } -message WriteResponse {} +message WriteResponse { + // the message_id from request. + uint64 message_id = 1 [(validate.rules).uint64.gt = 0]; + // status indicates the request processing result + model.v1.Status status = 2 [(validate.rules).enum.defined_only = true]; + // the metadata from request when request fails + common.v1.Metadata metadata = 3 [(validate.rules).message.required = true]; +} message InternalWriteRequest { uint32 shard_id = 1; diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index e2fb24c9..823183b1 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -231,14 +231,17 @@ type entityRepo struct { func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { var el partition.EntityLocator var id identity + var modRevision int64 switch schemaMetadata.Kind { case schema.KindMeasure: measure := schemaMetadata.Spec.(*databasev1.Measure) - el = partition.NewEntityLocator(measure.TagFamilies, measure.Entity) + modRevision = measure.GetMetadata().GetModRevision() + el = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision) id = getID(measure.GetMetadata()) case schema.KindStream: stream := schemaMetadata.Spec.(*databasev1.Stream) - el = partition.NewEntityLocator(stream.TagFamilies, stream.Entity) + modRevision = stream.GetMetadata().GetModRevision() + el = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision) id = getID(stream.GetMetadata()) default: return @@ -259,8 +262,8 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { Str("kind", kind). Msg("entity added or updated") } - en := make(partition.EntityLocator, 0, len(el)) - for _, l := range el { + en := make([]partition.TagLocator, 0, len(el.TagLocators)) + for _, l := range el.TagLocators { en = append(en, partition.TagLocator{ FamilyOffset: l.FamilyOffset, TagOffset: l.TagOffset, @@ -268,7 +271,7 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { } e.RWMutex.Lock() defer e.RWMutex.Unlock() - e.entitiesMap[id] = en + e.entitiesMap[id] = partition.EntityLocator{TagLocators: en, ModRevision: modRevision} } // OnDelete implements schema.EventHandler. @@ -310,7 +313,7 @@ func (e *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) { defer e.RWMutex.RUnlock() el, ok := e.entitiesMap[id] if !ok { - return nil, false + return partition.EntityLocator{}, false } return el, true } diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 8a541147..90cf7632 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -28,7 +28,9 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -56,8 +58,8 @@ func (ms *measureService) activeIngestionAccessLog(root string) (err error) { } func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error { - reply := func(measure measurev1.MeasureService_WriteServer, logger *logger.Logger) { - if errResp := measure.Send(&measurev1.WriteResponse{}); errResp != nil { + reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, measure measurev1.MeasureService_WriteServer, logger *logger.Logger) { + if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil { logger.Err(errResp).Msg("failed to send response") } } @@ -76,18 +78,28 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er } if err != nil { ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to receive message") - reply(measure, ms.sampled) - continue + return err } if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil { ms.sampled.Error().Err(errTime).Stringer("written", writeRequest).Msg("the data point time is invalid") - reply(measure, ms.sampled) + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, ms.sampled) + continue + } + measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata())) + if !existed { + ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found") + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled) + continue + } + if writeRequest.Metadata.ModRevision != measureCache.ModRevision { + ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired") + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled) continue } entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies()) if err != nil { ms.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target") - reply(measure, ms.sampled) + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) continue } if ms.ingestionAccessLog != nil { @@ -107,7 +119,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er if errWritePub != nil { ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to send a message") } - reply(measure, ms.sampled) + reply(nil, modelv1.Status_STATUS_SUCCEED, writeRequest.GetMessageId(), measure, ms.sampled) } } diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go index 93e49138..511d4e9b 100644 --- a/banyand/liaison/grpc/registry.go +++ b/banyand/liaison/grpc/registry.go @@ -38,19 +38,25 @@ type streamRegistryServer struct { func (rs *streamRegistryServer) Create(ctx context.Context, req *databasev1.StreamRegistryServiceCreateRequest, ) (*databasev1.StreamRegistryServiceCreateResponse, error) { - if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream()); err != nil { + modRevision, err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream()) + if err != nil { return nil, err } - return &databasev1.StreamRegistryServiceCreateResponse{}, nil + return &databasev1.StreamRegistryServiceCreateResponse{ + ModRevision: modRevision, + }, nil } func (rs *streamRegistryServer) Update(ctx context.Context, req *databasev1.StreamRegistryServiceUpdateRequest, ) (*databasev1.StreamRegistryServiceUpdateResponse, error) { - if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil { + modRevision, err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()) + if err != nil { return nil, err } - return &databasev1.StreamRegistryServiceUpdateResponse{}, nil + return &databasev1.StreamRegistryServiceUpdateResponse{ + ModRevision: modRevision, + }, nil } func (rs *streamRegistryServer) Delete(ctx context.Context, @@ -287,19 +293,25 @@ type measureRegistryServer struct { func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) ( *databasev1.MeasureRegistryServiceCreateResponse, error, ) { - if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure()); err != nil { + modRevision, err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure()) + if err != nil { return nil, err } - return &databasev1.MeasureRegistryServiceCreateResponse{}, nil + return &databasev1.MeasureRegistryServiceCreateResponse{ + ModRevision: modRevision, + }, nil } func (rs *measureRegistryServer) Update(ctx context.Context, req *databasev1.MeasureRegistryServiceUpdateRequest) ( *databasev1.MeasureRegistryServiceUpdateResponse, error, ) { - if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil { + modRevision, err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()) + if err != nil { return nil, err } - return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil + return &databasev1.MeasureRegistryServiceUpdateResponse{ + ModRevision: modRevision, + }, nil } func (rs *measureRegistryServer) Delete(ctx context.Context, req *databasev1.MeasureRegistryServiceDeleteRequest) ( diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index e4cc1ed1..d188f975 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -28,6 +28,8 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/accesslog" @@ -56,8 +58,8 @@ func (s *streamService) activeIngestionAccessLog(root string) (err error) { } func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { - reply := func(stream streamv1.StreamService_WriteServer, logger *logger.Logger) { - if errResp := stream.Send(&streamv1.WriteResponse{}); errResp != nil { + reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, stream streamv1.StreamService_WriteServer, logger *logger.Logger) { + if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil { logger.Err(errResp).Msg("failed to send response") } } @@ -76,18 +78,28 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { } if err != nil { s.sampled.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message") - reply(stream, s.sampled) - continue + return err } if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil { s.sampled.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid") - reply(stream, s.sampled) + reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, s.sampled) + continue + } + streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata())) + if !existed { + s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled) + continue + } + if writeEntity.Metadata.ModRevision != streamCache.ModRevision { + s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled) continue } entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies()) if err != nil { s.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to navigate to the write target") - reply(stream, s.sampled) + reply(nil, modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled) continue } if s.ingestionAccessLog != nil { @@ -108,7 +120,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { if errWritePub != nil { s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to send a message") } - reply(stream, s.sampled) + reply(nil, modelv1.Status_STATUS_SUCCEED, writeEntity.GetMessageId(), stream, s.sampled) } } diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go index fe516512..15780b54 100644 --- a/banyand/measure/measure_topn.go +++ b/banyand/measure/measure_topn.go @@ -182,7 +182,8 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket iwr := &measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ - Metadata: t.topNSchema.GetMetadata(), + MessageId: uint64(time.Now().UnixNano()), + Metadata: t.topNSchema.GetMetadata(), DataPoint: &measurev1.DataPointValue{ Timestamp: timestamppb.New(eventTime), TagFamilies: []*modelv1.TagFamilyForWrite{ diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index 3235b5c6..5384480f 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -136,6 +136,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb Request: &measurev1.WriteRequest{ Metadata: s.GetSchema().Metadata, DataPoint: value, + MessageId: uint64(time.Now().UnixNano()), }, EntityValues: entityValues[1:], }) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 26a6b648..053abee3 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -219,7 +219,7 @@ func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry schema Fields: []*databasev1.FieldSpec{TopNValueFieldSpec}, } if oldTopNSchema == nil { - if innerErr := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); innerErr != nil { + if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); innerErr != nil { return nil, innerErr } return newTopNMeasure, nil @@ -233,7 +233,7 @@ func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry schema return oldTopNSchema, nil } // update - if err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err != nil { + if _, err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err != nil { return nil, err } return newTopNMeasure, nil diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go index f6cda0f1..808a123d 100644 --- a/banyand/measure/metadata_test.go +++ b/banyand/measure/metadata_test.go @@ -122,7 +122,9 @@ var _ = Describe("Metadata", func() { measureSchema.Entity.TagNames = measureSchema.Entity.TagNames[1:] entitySize := len(measureSchema.Entity.TagNames) - Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), measureSchema)).Should(Succeed()) + modRevision, err := svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), measureSchema) + Expect(modRevision).ShouldNot(BeZero()) + Expect(err).ShouldNot(HaveOccurred()) Eventually(func() bool { val, err := svcs.measure.Measure(&commonv1.Metadata{ diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index f53a718c..d06d865e 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -185,88 +185,92 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto. // update will first ensure the existence of the entity with the metadata, // and overwrite the existing value if so. // Otherwise, it will return ErrGRPCResourceNotFound. -func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error { +func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) (int64, error) { if !e.closer.AddRunning() { - return ErrClosed + return 0, ErrClosed } defer e.closer.Done() key, err := metadata.key() if err != nil { - return err + return 0, err } key = e.prependNamespace(key) getResp, err := e.client.Get(ctx, key) if err != nil { - return err + return 0, err } if getResp.Count > 1 { - return errUnexpectedNumberOfEntities + return 0, errUnexpectedNumberOfEntities } val, err := proto.Marshal(metadata.Spec.(proto.Message)) if err != nil { - return err + return 0, err } replace := getResp.Count > 0 - if replace { - existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0]) - if innerErr != nil { - return innerErr - } - // directly return if we have the same entity - if metadata.equal(existingVal) { - return nil - } + if !replace { + return 0, ErrGRPCResourceNotFound + } + existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0]) + if innerErr != nil { + return 0, innerErr + } + // directly return if we have the same entity + if metadata.equal(existingVal) { + return 0, nil + } - modRevision := getResp.Kvs[0].ModRevision - txnResp, txnErr := e.client.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)). - Then(clientv3.OpPut(key, string(val))). - Commit() - if txnErr != nil { - return txnErr - } - if !txnResp.Succeeded { - return errConcurrentModification - } - } else { - return ErrGRPCResourceNotFound + modRevision := metadata.ModRevision + if modRevision == 0 { + modRevision = getResp.Kvs[0].ModRevision } - return nil + txnResp, txnErr := e.client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)). + Then(clientv3.OpPut(key, string(val))). + Commit() + if txnErr != nil { + return 0, txnErr + } + if !txnResp.Succeeded { + return 0, errConcurrentModification + } + + return txnResp.Responses[0].GetResponsePut().Header.Revision, nil } // create will first check existence of the entity with the metadata, // and put the value if it does not exist. // Otherwise, it will return ErrGRPCAlreadyExists. -func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) error { +func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) (int64, error) { if !e.closer.AddRunning() { - return ErrClosed + return 0, ErrClosed } defer e.closer.Done() key, err := metadata.key() if err != nil { - return err + return 0, err } key = e.prependNamespace(key) getResp, err := e.client.Get(ctx, key) if err != nil { - return err + return 0, err } if getResp.Count > 1 { - return errUnexpectedNumberOfEntities + return 0, errUnexpectedNumberOfEntities } val, err := proto.Marshal(metadata.Spec.(proto.Message)) if err != nil { - return err + return 0, err } replace := getResp.Count > 0 if replace { - return errGRPCAlreadyExists + return 0, errGRPCAlreadyExists } - _, err = e.client.Put(ctx, key, string(val)) + putResp, err := e.client.Put(ctx, key, string(val)) if err != nil { - return err + return 0, err } - return nil + + return putResp.Header.Revision, nil } func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, kind Kind) ([]proto.Message, error) { diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go index fc98664f..e4fc3913 100644 --- a/banyand/metadata/schema/etcd_test.go +++ b/banyand/metadata/schema/etcd_test.go @@ -77,7 +77,7 @@ func preloadSchema(e Registry) error { if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil { return err } - err := e.CreateStream(context.Background(), s) + _, err := e.CreateStream(context.Background(), s) if err != nil { return err } diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go index 5afa54d2..0ef85e59 100644 --- a/banyand/metadata/schema/group.go +++ b/banyand/metadata/schema/group.go @@ -76,23 +76,25 @@ func (e *etcdSchemaRegistry) CreateGroup(ctx context.Context, group *commonv1.Gr if group.UpdatedAt != nil { group.UpdatedAt = timestamppb.Now() } - return e.create(ctx, Metadata{ + _, err := e.create(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindGroup, Name: group.GetMetadata().GetName(), }, Spec: group, }) + return err } func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Group) error { - return e.update(ctx, Metadata{ + _, err := e.update(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindGroup, Name: group.GetMetadata().GetName(), }, Spec: group, }) + return err } func formatGroupKey(group string) string { diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go index 41ff541b..41b1b284 100644 --- a/banyand/metadata/schema/index.go +++ b/banyand/metadata/schema/index.go @@ -59,7 +59,7 @@ func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx context.Context, indexRu if indexRuleBinding.UpdatedAt != nil { indexRuleBinding.UpdatedAt = timestamppb.Now() } - return e.create(ctx, Metadata{ + _, err := e.create(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindIndexRuleBinding, Name: indexRuleBinding.GetMetadata().GetName(), @@ -67,10 +67,11 @@ func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx context.Context, indexRu }, Spec: indexRuleBinding, }) + return err } func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error { - return e.update(ctx, Metadata{ + _, err := e.update(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindIndexRuleBinding, Name: indexRuleBinding.GetMetadata().GetName(), @@ -78,6 +79,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRu }, Spec: indexRuleBinding, }) + return err } func (e *etcdSchemaRegistry) DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { @@ -129,7 +131,7 @@ func (e *etcdSchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *dat buf = append(buf, indexRule.Metadata.Name...) indexRule.Metadata.Id = crc32.ChecksumIEEE(buf) } - return e.create(ctx, Metadata{ + _, err := e.create(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindIndexRule, Name: indexRule.GetMetadata().GetName(), @@ -137,6 +139,7 @@ func (e *etcdSchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *dat }, Spec: indexRule, }) + return err } func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error { @@ -147,7 +150,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *dat } indexRule.Metadata.Id = existingIndexRule.Metadata.Id } - return e.update(ctx, Metadata{ + _, err := e.update(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindIndexRule, Name: indexRule.GetMetadata().GetName(), @@ -155,6 +158,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *dat }, Spec: indexRule, }) + return err } func (e *etcdSchemaRegistry) DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go index 5b0b6f50..a17f8fcd 100644 --- a/banyand/metadata/schema/measure.go +++ b/banyand/metadata/schema/measure.go @@ -56,13 +56,13 @@ func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*d return entities, nil } -func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databasev1.Measure) error { +func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) { if measure.UpdatedAt != nil { measure.UpdatedAt = timestamppb.Now() } if measure.GetInterval() != "" { if _, err := timestamp.ParseDuration(measure.GetInterval()); err != nil { - return errors.Wrap(err, "interval is malformed") + return 0, errors.Wrap(err, "interval is malformed") } } return e.create(ctx, Metadata{ @@ -75,17 +75,18 @@ func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databas }) } -func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error { +func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) { if measure.GetInterval() != "" { if _, err := timestamp.ParseDuration(measure.GetInterval()); err != nil { - return errors.Wrap(err, "interval is malformed") + return 0, errors.Wrap(err, "interval is malformed") } } return e.update(ctx, Metadata{ TypeMeta: TypeMeta{ - Kind: KindMeasure, - Group: measure.GetMetadata().GetGroup(), - Name: measure.GetMetadata().GetName(), + Kind: KindMeasure, + Group: measure.GetMetadata().GetGroup(), + Name: measure.GetMetadata().GetName(), + ModRevision: measure.GetMetadata().GetModRevision(), }, Spec: measure, }) diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go index bb76f0a8..1456199a 100644 --- a/banyand/metadata/schema/property.go +++ b/banyand/metadata/schema/property.go @@ -101,7 +101,7 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx context.Context, property *proper Spec: property, } tagsNum := uint32(len(property.Tags)) - err := e.create(ctx, md) + _, err := e.create(ctx, md) if err == nil { return true, tagsNum, nil } @@ -125,7 +125,7 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx context.Context, property *proper existed.Tags = append(existed.Tags, property.Tags...) md.Spec = existed } - if err = e.update(ctx, md); err != nil { + if _, err = e.update(ctx, md); err != nil { return false, 0, err } return false, tagsNum, nil diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index d5dc0185..6806b1a5 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -60,9 +60,10 @@ type Registry interface { // TypeMeta defines the identity and type of an Event. type TypeMeta struct { - Name string - Group string - Kind Kind + Name string + Group string + ModRevision int64 + Kind Kind } // Metadata wrap dedicated serialized resource and its TypeMeta. @@ -136,8 +137,8 @@ func (m Metadata) equal(other Metadata) bool { type Stream interface { GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) - CreateStream(ctx context.Context, stream *databasev1.Stream) error - UpdateStream(ctx context.Context, stream *databasev1.Stream) error + CreateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) + UpdateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) } @@ -163,8 +164,8 @@ type IndexRuleBinding interface { type Measure interface { GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) - CreateMeasure(ctx context.Context, measure *databasev1.Measure) error - UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error + CreateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) + UpdateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, error) DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error) TopNAggregations(ctx context.Context, metadata *commonv1.Metadata) ([]*databasev1.TopNAggregation, error) } diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go index 8f1c7a42..c6499931 100644 --- a/banyand/metadata/schema/shard.go +++ b/banyand/metadata/schema/shard.go @@ -41,14 +41,15 @@ func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx context.Context, shard *dat }, Spec: shard, } - err := e.update(ctx, md) + _, err := e.update(ctx, md) if err == nil { return nil } if errors.Is(err, ErrGRPCResourceNotFound) { shard.CreatedAt = shard.UpdatedAt md.Spec = shard - return e.create(ctx, md) + _, err = e.create(ctx, md) + return err } return err } diff --git a/banyand/metadata/schema/stream.go b/banyand/metadata/schema/stream.go index d2e2491f..645a635d 100644 --- a/banyand/metadata/schema/stream.go +++ b/banyand/metadata/schema/stream.go @@ -51,25 +51,26 @@ func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*da return entities, nil } -func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev1.Stream) error { +func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { return e.update(ctx, Metadata{ TypeMeta: TypeMeta{ - Kind: KindStream, - Group: stream.GetMetadata().GetGroup(), - Name: stream.GetMetadata().GetName(), + Kind: KindStream, + Group: stream.GetMetadata().GetGroup(), + Name: stream.GetMetadata().GetName(), + ModRevision: stream.GetMetadata().GetModRevision(), }, Spec: stream, }) } -func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) error { +func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { if stream.UpdatedAt != nil { stream.UpdatedAt = timestamppb.Now() } group := stream.Metadata.GetGroup() _, err := e.GetGroup(ctx, group) if err != nil { - return err + return 0, err } return e.create(ctx, Metadata{ TypeMeta: TypeMeta{ diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go index 858fdf04..60597f1a 100644 --- a/banyand/metadata/schema/topn.go +++ b/banyand/metadata/schema/topn.go @@ -55,7 +55,7 @@ func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx context.Context, topNAggr if topNAggregation.UpdatedAt != nil { topNAggregation.UpdatedAt = timestamppb.Now() } - return e.create(ctx, Metadata{ + _, err := e.create(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindTopNAggregation, Group: topNAggregation.GetMetadata().GetGroup(), @@ -63,10 +63,11 @@ func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx context.Context, topNAggr }, Spec: topNAggregation, }) + return err } func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx context.Context, topNAggregation *databasev1.TopNAggregation) error { - return e.update(ctx, Metadata{ + _, err := e.update(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindTopNAggregation, Group: topNAggregation.GetMetadata().GetGroup(), @@ -74,6 +75,7 @@ func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx context.Context, topNAggr }, Spec: topNAggregation, }) + return err } func (e *etcdSchemaRegistry) DeleteTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { diff --git a/banyand/metadata/schema/watcher_test.go b/banyand/metadata/schema/watcher_test.go index a796c4bb..cc5b2f45 100644 --- a/banyand/metadata/schema/watcher_test.go +++ b/banyand/metadata/schema/watcher_test.go @@ -118,13 +118,15 @@ var _ = ginkgo.Describe("Watcher", func() { }, }) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + var modRevision int64 for i := 0; i < 2; i++ { - err = registry.CreateMeasure(context.Background(), &databasev1.Measure{ + modRevision, err = registry.CreateMeasure(context.Background(), &databasev1.Measure{ Metadata: &commonv1.Metadata{ Name: fmt.Sprintf("testkey%d", i+1), Group: "testgroup-measure", }, }) + gomega.Expect(modRevision).ShouldNot(gomega.BeZero()) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } @@ -156,12 +158,14 @@ var _ = ginkgo.Describe("Watcher", func() { }, }) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - err = registry.CreateStream(context.Background(), &databasev1.Stream{ + var modRevision int64 + modRevision, err = registry.CreateStream(context.Background(), &databasev1.Stream{ Metadata: &commonv1.Metadata{ Name: "testkey", Group: "testgroup-stream", }, }) + gomega.Expect(modRevision).ShouldNot(gomega.BeZero()) gomega.Expect(err).NotTo(gomega.HaveOccurred()) ok, err := registry.DeleteStream(context.Background(), &commonv1.Metadata{ Name: "testkey", diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go index 8c4f3bfb..f0133ea5 100644 --- a/banyand/stream/metadata_test.go +++ b/banyand/stream/metadata_test.go @@ -122,7 +122,9 @@ var _ = Describe("Metadata", func() { streamSchema.Entity.TagNames = streamSchema.Entity.TagNames[1:] entitySize := len(streamSchema.Entity.TagNames) - Expect(svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), streamSchema)).Should(Succeed()) + modRevision, err := svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), streamSchema) + Expect(modRevision).ShouldNot(BeZero()) + Expect(err).ShouldNot(HaveOccurred()) Eventually(func() bool { val, ok := svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{ diff --git a/docs/api-reference.md b/docs/api-reference.md index 96379185..0ac80eac 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -168,6 +168,9 @@ - [TopNRequest](#banyandb-measure-v1-TopNRequest) - [TopNResponse](#banyandb-measure-v1-TopNResponse) +- [banyandb/model/v1/write.proto](#banyandb_model_v1_write-proto) + - [Status](#banyandb-model-v1-Status) + - [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto) - [DataPointValue](#banyandb-measure-v1-DataPointValue) - [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest) @@ -1671,6 +1674,11 @@ Type determine the index structure under the hood +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| mod_revision | [int64](#int64) | | | + + @@ -1817,6 +1825,11 @@ Type determine the index structure under the hood +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| mod_revision | [int64](#int64) | | | + + @@ -1842,6 +1855,11 @@ Type determine the index structure under the hood +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| mod_revision | [int64](#int64) | | | + + @@ -1988,6 +2006,11 @@ Type determine the index structure under the hood +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| mod_revision | [int64](#int64) | | | + + @@ -2498,6 +2521,38 @@ TopNResponse is the response for a query to the Query module. +<a name="banyandb_model_v1_write-proto"></a> +<p align="right"><a href="#top">Top</a></p> + +## banyandb/model/v1/write.proto + + + + + +<a name="banyandb-model-v1-Status"></a> + +### Status +Status is the response status for write + +| Name | Number | Description | +| ---- | ------ | ----------- | +| STATUS_UNSPECIFIED | 0 | | +| STATUS_SUCCEED | 1 | | +| STATUS_INVALID_TIMESTAMP | 2 | | +| STATUS_NOT_FOUND | 3 | | +| STATUS_EXPIRED_SCHEMA | 4 | | +| STATUS_INTERNAL_ERROR | 5 | | + + + + + + + + + + <a name="banyandb_measure_v1_write-proto"></a> <p align="right"><a href="#top">Top</a></p> @@ -2550,6 +2605,7 @@ WriteRequest is the request contract for write | ----- | ---- | ----- | ----------- | | metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is required. | | data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) | | the data_point is required. | +| message_id | [uint64](#uint64) | | the message_id is required. | @@ -2562,6 +2618,13 @@ WriteRequest is the request contract for write WriteResponse is the response contract for write +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| message_id | [uint64](#uint64) | | the message_id from request. | +| status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) | | status indicates the request processing result | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata from request when request fails | + + @@ -2942,8 +3005,9 @@ QueryResponse is the response for a query to the Query module. | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is only required in the first write. | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is required. | | element | [ElementValue](#banyandb-stream-v1-ElementValue) | | the element is required. | +| message_id | [uint64](#uint64) | | the message_id is required. | @@ -2956,6 +3020,13 @@ QueryResponse is the response for a query to the Query module. +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| message_id | [uint64](#uint64) | | the message_id from request. | +| status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) | | status indicates the request processing result | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata from request when request fails | + + diff --git a/pkg/partition/entity.go b/pkg/partition/entity.go index f5a86100..22433fc6 100644 --- a/pkg/partition/entity.go +++ b/pkg/partition/entity.go @@ -33,7 +33,10 @@ import ( var ErrMalformedElement = errors.New("element is malformed") // EntityLocator combines several TagLocators that help find the entity value. -type EntityLocator []TagLocator +type EntityLocator struct { + TagLocators []TagLocator + ModRevision int64 +} // TagLocator contains offsets to retrieve a tag swiftly. type TagLocator struct { @@ -42,22 +45,22 @@ type TagLocator struct { } // NewEntityLocator return a EntityLocator based on tag family spec and entity spec. -func NewEntityLocator(families []*databasev1.TagFamilySpec, entity *databasev1.Entity) EntityLocator { - locator := make(EntityLocator, 0, len(entity.GetTagNames())) +func NewEntityLocator(families []*databasev1.TagFamilySpec, entity *databasev1.Entity, modRevision int64) EntityLocator { + locator := make([]TagLocator, 0, len(entity.GetTagNames())) for _, tagInEntity := range entity.GetTagNames() { fIndex, tIndex, tag := pbv1.FindTagByName(families, tagInEntity) if tag != nil { locator = append(locator, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex}) } } - return locator + return EntityLocator{TagLocators: locator, ModRevision: modRevision} } // Find the entity from a tag family, prepend a subject to the entity. func (e EntityLocator) Find(subject string, value []*modelv1.TagFamilyForWrite) (tsdb.Entity, tsdb.EntityValues, error) { - entityValues := make(tsdb.EntityValues, len(e)+1) + entityValues := make(tsdb.EntityValues, len(e.TagLocators)+1) entityValues[0] = tsdb.StrValue(subject) - for i, index := range e { + for i, index := range e.TagLocators { tag, err := GetTagByOffset(value, index.FamilyOffset, index.TagOffset) if err != nil { return nil, nil, err diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index 4297c296..9d197df4 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -39,7 +39,6 @@ var ( errUnsupportedConditionValue = errors.New("unsupported condition value type") errInvalidCriteriaType = errors.New("invalid criteria type") errIndexNotDefined = errors.New("index is not define for the tag") - errInvalidData = errors.New("data is invalid") nullTag = &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} ) @@ -60,22 +59,30 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe if len(refs) == 0 { continue } - tags := make([]*modelv1.Tag, len(refs)) familyName := refs[0].Tag.getFamilyName() parsedTagFamily, err := ec.ParseTagFamily(familyName, item) if err != nil { return nil, errors.WithMessage(err, "parse projection") } - if len(refs) > len(parsedTagFamily.Tags) { - return nil, errors.Wrapf(errInvalidData, - "the number of tags %d in %s is less then expected %d", - len(parsedTagFamily.Tags), familyName, len(refs)) + + parsedTagSize := len(parsedTagFamily.GetTags()) + tagRefSize := len(refs) + + // Determine maximum size for creating the tags slice + maxSize := tagRefSize + if parsedTagSize < tagRefSize { + maxSize = parsedTagSize } + + tags := make([]*modelv1.Tag, maxSize) + for j, ref := range refs { - if len(parsedTagFamily.GetTags()) > ref.Spec.TagIdx { + if parsedTagSize > ref.Spec.TagIdx { tags[j] = parsedTagFamily.GetTags()[ref.Spec.TagIdx] - } else { + } else if j < parsedTagSize { tags[j] = &modelv1.Tag{Key: ref.Tag.name, Value: nullTag} + } else { + break } } diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go index 18e48846..e647cf9f 100644 --- a/pkg/test/measure/etcd.go +++ b/pkg/test/measure/etcd.go @@ -51,7 +51,8 @@ func PreloadSchema(ctx context.Context, e schema.Registry) error { return errors.WithStack(err) } if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure *databasev1.Measure) error { - return e.CreateMeasure(ctx, measure) + _, innerErr := e.CreateMeasure(ctx, measure) + return innerErr }); err != nil { return errors.WithStack(err) } diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go index e7557c31..b796ef01 100644 --- a/pkg/test/stream/etcd.go +++ b/pkg/test/stream/etcd.go @@ -57,7 +57,7 @@ func PreloadSchema(ctx context.Context, e schema.Registry) error { if unmarshalErr := protojson.Unmarshal([]byte(streamJSON), s); unmarshalErr != nil { return unmarshalErr } - if innerErr := e.CreateStream(ctx, s); innerErr != nil { + if _, innerErr := e.CreateStream(ctx, s); innerErr != nil { return innerErr } diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index 04316efe..8405152f 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/yaml" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" @@ -105,7 +106,7 @@ func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClien dataPointValue := &measurev1.DataPointValue{} gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred()) dataPointValue.Timestamp = timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval)) - gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue})). + gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})). Should(gm.Succeed()) } } @@ -114,14 +115,21 @@ func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClien func Write(conn *grpclib.ClientConn, name, group, dataFile string, baseTime time.Time, interval time.Duration, ) { + metadata := &commonv1.Metadata{ + Name: name, + Group: group, + } + + schema := databasev1.NewMeasureRegistryServiceClient(conn) + resp, err := schema.Get(context.Background(), &databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + metadata = resp.GetMeasure().GetMetadata() + c := measurev1.NewMeasureServiceClient(conn) ctx := context.Background() writeClient, err := c.Write(ctx) gm.Expect(err).NotTo(gm.HaveOccurred()) - loadData(&commonv1.Metadata{ - Name: name, - Group: group, - }, writeClient, dataFile, baseTime, interval) + loadData(metadata, writeClient, dataFile, baseTime, interval) gm.Expect(writeClient.CloseSend()).To(gm.Succeed()) gm.Eventually(func() error { _, err := writeClient.Recv() diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go index cb6cbdee..4e48d18c 100644 --- a/test/cases/stream/data/data.go +++ b/test/cases/stream/data/data.go @@ -37,6 +37,7 @@ import ( "sigs.k8s.io/yaml" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/test/flags" @@ -97,12 +98,13 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args }) } -func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) { +func loadData(stream streamv1.StreamService_WriteClient, metadata *commonv1.Metadata, dataFile string, baseTime time.Time, interval time.Duration) { var templates []interface{} content, err := dataFS.ReadFile("testdata/" + dataFile) gm.Expect(err).ShouldNot(gm.HaveOccurred()) gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred()) bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+") + for i, template := range templates { rawSearchTagFamily, errMarshal := json.Marshal(template) gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred()) @@ -125,11 +127,9 @@ func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTi } e.TagFamilies = append(e.TagFamilies, searchTagFamily) errInner := stream.Send(&streamv1.WriteRequest{ - Metadata: &commonv1.Metadata{ - Name: "sw", - Group: "default", - }, - Element: e, + Metadata: metadata, + Element: e, + MessageId: uint64(time.Now().UnixNano()), }) gm.Expect(errInner).ShouldNot(gm.HaveOccurred()) } @@ -137,11 +137,20 @@ func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTi // Write data into the server. func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, interval time.Duration) { + metadata := &commonv1.Metadata{ + Name: "sw", + Group: "default", + } + schema := databasev1.NewStreamRegistryServiceClient(conn) + resp, err := schema.Get(context.Background(), &databasev1.StreamRegistryServiceGetRequest{Metadata: metadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + metadata = resp.GetStream().GetMetadata() + c := streamv1.NewStreamServiceClient(conn) ctx := context.Background() writeClient, err := c.Write(ctx) gm.Expect(err).NotTo(gm.HaveOccurred()) - loadData(writeClient, dataFile, baseTime, interval) + loadData(writeClient, metadata, dataFile, baseTime, interval) gm.Expect(writeClient.CloseSend()).To(gm.Succeed()) gm.Eventually(func() error { _, err := writeClient.Recv() diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml index adcd5c08..98f0a428 100644 --- a/test/docker/base-compose.yml +++ b/test/docker/base-compose.yml @@ -33,7 +33,7 @@ services: - sw_agent:/skywalking-java-agent oap: - image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}" + image: "ghcr.io/apache/skywalking-banyandb-java-client/oap:${SW_OAP_COMMIT}" expose: - 11800 - 12800 @@ -55,7 +55,7 @@ services: retries: 120 ui: - image: "ghcr.io/apache/skywalking/ui:${SW_OAP_COMMIT}" + image: "ghcr.io/apache/skywalking-banyandb-java-client/ui:${SW_OAP_COMMIT}" expose: - 8080 environment: diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index 57dc048e..c1d3d0a0 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -25,5 +25,5 @@ SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5 SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed -SW_OAP_COMMIT=5ad74eb2619b4cebe57a150ca328b2c1d052cb54 +SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9 diff --git a/test/stress/cases/istio/istio_suite_test.go b/test/stress/cases/istio/istio_suite_test.go index 88adad25..a2a2a5b6 100644 --- a/test/stress/cases/istio/istio_suite_test.go +++ b/test/stress/cases/istio/istio_suite_test.go @@ -150,6 +150,7 @@ func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) error { return fmt.Errorf("failed to unmarshal JSON message: %w", errUnmarshal) } + req.MessageId = uint64(time.Now().UnixNano()) req.DataPoint.Timestamp = timestamppb.New(adjustTime(req.DataPoint.Timestamp.AsTime())) // Write the request to the measureService if errSend := client.Send(&req); errSend != nil { diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go index 6f5fe011..6865ba60 100644 --- a/test/stress/cases/istio/repo.go +++ b/test/stress/cases/istio/repo.go @@ -147,7 +147,8 @@ func (p *preloadService) PreRun(_ context.Context) error { return errors.WithStack(err) } if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure *databasev1.Measure) error { - return e.CreateMeasure(context.TODO(), measure) + _, innerErr := e.CreateMeasure(context.TODO(), measure) + return innerErr }); err != nil { return errors.WithStack(err) } diff --git a/test/stress/env b/test/stress/env index 1ff35800..91076f83 100644 --- a/test/stress/env +++ b/test/stress/env @@ -26,7 +26,7 @@ SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772 SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653 SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b -SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0 +SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a TARGET=test diff --git a/test/stress/env.dev b/test/stress/env.dev index 604ded57..febe0637 100644 --- a/test/stress/env.dev +++ b/test/stress/env.dev @@ -25,7 +25,7 @@ SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772 SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653 SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b -SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0 +SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a TARGET=dev diff --git a/ui/src/components/Editor/index.vue b/ui/src/components/Editor/index.vue index 5c4851a3..4f5a4daa 100644 --- a/ui/src/components/Editor/index.vue +++ b/ui/src/components/Editor/index.vue @@ -55,6 +55,7 @@ const data = reactive({ form: { group: route.params.group, name: route.params.group, + modRevision: route.params.modRevision, interval: 1, intervalUnit: 'ns' } @@ -98,6 +99,7 @@ const options = [ watch(() => route, () => { data.form.group = route.params.group data.form.name = route.params.name + data.form.modRevision = route.params.modRevision data.type = route.params.type + '' data.operator = route.params.operator initData() @@ -149,7 +151,8 @@ const submit = async (formEl: FormInstance | undefined) => { const form = { metadata: { group: data.form.group, - name: data.form.name + name: data.form.name, + modRevision: data.form.modRevision }, tagFamilies: tagFamilies, entity: { @@ -262,6 +265,7 @@ function initData() { data.form.intervalUnit = intervalUnit fieldEditorRef.value.setFields(fields) } + data.form.modRevision = res.data[data.type + ''].metadata.modRevision } }) .finally(() => {