This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9552688d Add mod revision check to write requests(measure/stream) 
(#322)
9552688d is described below

commit 9552688d60f055f6bb539a91464e8b1db77e7309
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Fri Sep 15 14:21:32 2023 +0800

    Add mod revision check to write requests(measure/stream) (#322)
    
    * Add mod revision check to write requests(measure/stream)
    
    - Support for create/update schema check mod revision
    - Support for write data check mod revision
    - Adapt to web console requests
    - Update OAP e2e image
    - Fix query failure caused by schema change
    
    ---------
    
    Co-authored-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                  |  1 +
 api/proto/banyandb/database/v1/rpc.proto    | 16 ++++--
 api/proto/banyandb/measure/v1/write.proto   | 12 ++++-
 api/proto/banyandb/model/v1/write.proto     | 33 ++++++++++++
 api/proto/banyandb/stream/v1/write.proto    | 14 ++++-
 banyand/liaison/grpc/discovery.go           | 15 +++---
 banyand/liaison/grpc/measure.go             | 26 ++++++---
 banyand/liaison/grpc/registry.go            | 28 +++++++---
 banyand/liaison/grpc/stream.go              | 26 ++++++---
 banyand/measure/measure_topn.go             |  3 +-
 banyand/measure/measure_write.go            |  1 +
 banyand/measure/metadata.go                 |  4 +-
 banyand/measure/metadata_test.go            |  4 +-
 banyand/metadata/schema/etcd.go             | 82 +++++++++++++++--------------
 banyand/metadata/schema/etcd_test.go        |  2 +-
 banyand/metadata/schema/group.go            |  6 ++-
 banyand/metadata/schema/index.go            | 12 +++--
 banyand/metadata/schema/measure.go          | 15 +++---
 banyand/metadata/schema/property.go         |  4 +-
 banyand/metadata/schema/schema.go           | 15 +++---
 banyand/metadata/schema/shard.go            |  5 +-
 banyand/metadata/schema/stream.go           | 13 ++---
 banyand/metadata/schema/topn.go             |  6 ++-
 banyand/metadata/schema/watcher_test.go     |  8 ++-
 banyand/stream/metadata_test.go             |  4 +-
 docs/api-reference.md                       | 73 ++++++++++++++++++++++++-
 pkg/partition/entity.go                     | 15 +++---
 pkg/query/logical/common.go                 | 23 +++++---
 pkg/test/measure/etcd.go                    |  3 +-
 pkg/test/stream/etcd.go                     |  2 +-
 test/cases/measure/data/data.go             | 18 +++++--
 test/cases/stream/data/data.go              | 23 +++++---
 test/docker/base-compose.yml                |  4 +-
 test/e2e-v2/script/env                      |  2 +-
 test/stress/cases/istio/istio_suite_test.go |  1 +
 test/stress/cases/istio/repo.go             |  3 +-
 test/stress/env                             |  2 +-
 test/stress/env.dev                         |  2 +-
 ui/src/components/Editor/index.vue          |  6 ++-
 39 files changed, 382 insertions(+), 150 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0d98483c..58fcd220 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
 - Implement the remote queue to spreading data to data nodes.
 - Fix parse environment variables error
 - Implement the distributed query engine.
+- Add mod revision check to write requests.
 
 ### Bugs
 
diff --git a/api/proto/banyandb/database/v1/rpc.proto 
b/api/proto/banyandb/database/v1/rpc.proto
index 1fbccd5d..09ffada3 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -32,13 +32,17 @@ message StreamRegistryServiceCreateRequest {
   banyandb.database.v1.Stream stream = 1;
 }
 
-message StreamRegistryServiceCreateResponse {}
+message StreamRegistryServiceCreateResponse {
+  int64 mod_revision = 1;
+}
 
 message StreamRegistryServiceUpdateRequest {
   banyandb.database.v1.Stream stream = 1;
 }
 
-message StreamRegistryServiceUpdateResponse {}
+message StreamRegistryServiceUpdateResponse {
+  int64 mod_revision = 1;
+}
 
 message StreamRegistryServiceDeleteRequest {
   banyandb.common.v1.Metadata metadata = 1;
@@ -260,13 +264,17 @@ message MeasureRegistryServiceCreateRequest {
   banyandb.database.v1.Measure measure = 1;
 }
 
-message MeasureRegistryServiceCreateResponse {}
+message MeasureRegistryServiceCreateResponse {
+  int64 mod_revision = 1;
+}
 
 message MeasureRegistryServiceUpdateRequest {
   banyandb.database.v1.Measure measure = 1;
 }
 
-message MeasureRegistryServiceUpdateResponse {}
+message MeasureRegistryServiceUpdateResponse {
+  int64 mod_revision = 1;
+}
 
 message MeasureRegistryServiceDeleteRequest {
   banyandb.common.v1.Metadata metadata = 1;
diff --git a/api/proto/banyandb/measure/v1/write.proto 
b/api/proto/banyandb/measure/v1/write.proto
index 49a7a004..9d007334 100644
--- a/api/proto/banyandb/measure/v1/write.proto
+++ b/api/proto/banyandb/measure/v1/write.proto
@@ -21,6 +21,7 @@ package banyandb.measure.v1;
 
 import "banyandb/common/v1/common.proto";
 import "banyandb/model/v1/common.proto";
+import "banyandb/model/v1/write.proto";
 import "google/protobuf/timestamp.proto";
 import "validate/validate.proto";
 
@@ -43,10 +44,19 @@ message WriteRequest {
   common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
   // the data_point is required.
   DataPointValue data_point = 2 [(validate.rules).message.required = true];
+  // the message_id is required.
+  uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
 }
 
 // WriteResponse is the response contract for write
-message WriteResponse {}
+message WriteResponse {
+  // the message_id from request.
+  uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
+  // status indicates the request processing result
+  model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
+  // the metadata from request when request fails
+  common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];
+}
 
 message InternalWriteRequest {
   uint32 shard_id = 1;
diff --git a/api/proto/banyandb/model/v1/write.proto 
b/api/proto/banyandb/model/v1/write.proto
new file mode 100644
index 00000000..83b8c91e
--- /dev/null
+++ b/api/proto/banyandb/model/v1/write.proto
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.model.v1;
+
+option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1";
+option java_package = "org.apache.skywalking.banyandb.model.v1";
+
+// Status is the response status for write
+enum Status {
+  STATUS_UNSPECIFIED = 0;
+  STATUS_SUCCEED = 1;
+  STATUS_INVALID_TIMESTAMP = 2;
+  STATUS_NOT_FOUND = 3;
+  STATUS_EXPIRED_SCHEMA = 4;
+  STATUS_INTERNAL_ERROR = 5;
+}
diff --git a/api/proto/banyandb/stream/v1/write.proto 
b/api/proto/banyandb/stream/v1/write.proto
index e177eb11..1c3347f1 100644
--- a/api/proto/banyandb/stream/v1/write.proto
+++ b/api/proto/banyandb/stream/v1/write.proto
@@ -21,6 +21,7 @@ package banyandb.stream.v1;
 
 import "banyandb/common/v1/common.proto";
 import "banyandb/model/v1/common.proto";
+import "banyandb/model/v1/write.proto";
 import "google/protobuf/timestamp.proto";
 import "validate/validate.proto";
 
@@ -39,13 +40,22 @@ message ElementValue {
 }
 
 message WriteRequest {
-  // the metadata is only required in the first write.
+  // the metadata is required.
   common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
   // the element is required.
   ElementValue element = 2 [(validate.rules).message.required = true];
+  // the message_id is required.
+  uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
 }
 
-message WriteResponse {}
+message WriteResponse {
+  // the message_id from request.
+  uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
+  // status indicates the request processing result
+  model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
+  // the metadata from request when request fails
+  common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];
+}
 
 message InternalWriteRequest {
   uint32 shard_id = 1;
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index e2fb24c9..823183b1 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -231,14 +231,17 @@ type entityRepo struct {
 func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
        var el partition.EntityLocator
        var id identity
+       var modRevision int64
        switch schemaMetadata.Kind {
        case schema.KindMeasure:
                measure := schemaMetadata.Spec.(*databasev1.Measure)
-               el = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity)
+               modRevision = measure.GetMetadata().GetModRevision()
+               el = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity, modRevision)
                id = getID(measure.GetMetadata())
        case schema.KindStream:
                stream := schemaMetadata.Spec.(*databasev1.Stream)
-               el = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity)
+               modRevision = stream.GetMetadata().GetModRevision()
+               el = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity, modRevision)
                id = getID(stream.GetMetadata())
        default:
                return
@@ -259,8 +262,8 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
                        Str("kind", kind).
                        Msg("entity added or updated")
        }
-       en := make(partition.EntityLocator, 0, len(el))
-       for _, l := range el {
+       en := make([]partition.TagLocator, 0, len(el.TagLocators))
+       for _, l := range el.TagLocators {
                en = append(en, partition.TagLocator{
                        FamilyOffset: l.FamilyOffset,
                        TagOffset:    l.TagOffset,
@@ -268,7 +271,7 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        }
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
-       e.entitiesMap[id] = en
+       e.entitiesMap[id] = partition.EntityLocator{TagLocators: en, 
ModRevision: modRevision}
 }
 
 // OnDelete implements schema.EventHandler.
@@ -310,7 +313,7 @@ func (e *entityRepo) getLocator(id identity) 
(partition.EntityLocator, bool) {
        defer e.RWMutex.RUnlock()
        el, ok := e.entitiesMap[id]
        if !ok {
-               return nil, false
+               return partition.EntityLocator{}, false
        }
        return el, true
 }
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 8a541147..90cf7632 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -28,7 +28,9 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/accesslog"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -56,8 +58,8 @@ func (ms *measureService) activeIngestionAccessLog(root 
string) (err error) {
 }
 
 func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) 
error {
-       reply := func(measure measurev1.MeasureService_WriteServer, logger 
*logger.Logger) {
-               if errResp := measure.Send(&measurev1.WriteResponse{}); errResp 
!= nil {
+       reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
messageId uint64, measure measurev1.MeasureService_WriteServer, logger 
*logger.Logger) {
+               if errResp := measure.Send(&measurev1.WriteResponse{Metadata: 
metadata, Status: status, MessageId: messageId}); errResp != nil {
                        logger.Err(errResp).Msg("failed to send response")
                }
        }
@@ -76,18 +78,28 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                }
                if err != nil {
                        ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to receive message")
-                       reply(measure, ms.sampled)
-                       continue
+                       return err
                }
                if errTime := 
timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
                        ms.sampled.Error().Err(errTime).Stringer("written", 
writeRequest).Msg("the data point time is invalid")
-                       reply(measure, ms.sampled)
+                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, 
ms.sampled)
+                       continue
+               }
+               measureCache, existed := 
ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
+               if !existed {
+                       ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to measure schema not found")
+                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, 
ms.sampled)
+                       continue
+               }
+               if writeRequest.Metadata.ModRevision != 
measureCache.ModRevision {
+                       ms.sampled.Error().Stringer("written", 
writeRequest).Msg("the measure schema is expired")
+                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, 
ms.sampled)
                        continue
                }
                entity, tagValues, shardID, err := 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
                if err != nil {
                        ms.sampled.Error().Err(err).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
-                       reply(measure, ms.sampled)
+                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, 
ms.sampled)
                        continue
                }
                if ms.ingestionAccessLog != nil {
@@ -107,7 +119,7 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                if errWritePub != nil {
                        ms.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to send a message")
                }
-               reply(measure, ms.sampled)
+               reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeRequest.GetMessageId(), measure, ms.sampled)
        }
 }
 
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 93e49138..511d4e9b 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -38,19 +38,25 @@ type streamRegistryServer struct {
 func (rs *streamRegistryServer) Create(ctx context.Context,
        req *databasev1.StreamRegistryServiceCreateRequest,
 ) (*databasev1.StreamRegistryServiceCreateResponse, error) {
-       if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, 
req.GetStream()); err != nil {
+       modRevision, err := 
rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream())
+       if err != nil {
                return nil, err
        }
-       return &databasev1.StreamRegistryServiceCreateResponse{}, nil
+       return &databasev1.StreamRegistryServiceCreateResponse{
+               ModRevision: modRevision,
+       }, nil
 }
 
 func (rs *streamRegistryServer) Update(ctx context.Context,
        req *databasev1.StreamRegistryServiceUpdateRequest,
 ) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
-       if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, 
req.GetStream()); err != nil {
+       modRevision, err := 
rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream())
+       if err != nil {
                return nil, err
        }
-       return &databasev1.StreamRegistryServiceUpdateResponse{}, nil
+       return &databasev1.StreamRegistryServiceUpdateResponse{
+               ModRevision: modRevision,
+       }, nil
 }
 
 func (rs *streamRegistryServer) Delete(ctx context.Context,
@@ -287,19 +293,25 @@ type measureRegistryServer struct {
 func (rs *measureRegistryServer) Create(ctx context.Context, req 
*databasev1.MeasureRegistryServiceCreateRequest) (
        *databasev1.MeasureRegistryServiceCreateResponse, error,
 ) {
-       if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, 
req.GetMeasure()); err != nil {
+       modRevision, err := 
rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure())
+       if err != nil {
                return nil, err
        }
-       return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
+       return &databasev1.MeasureRegistryServiceCreateResponse{
+               ModRevision: modRevision,
+       }, nil
 }
 
 func (rs *measureRegistryServer) Update(ctx context.Context, req 
*databasev1.MeasureRegistryServiceUpdateRequest) (
        *databasev1.MeasureRegistryServiceUpdateResponse, error,
 ) {
-       if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, 
req.GetMeasure()); err != nil {
+       modRevision, err := 
rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure())
+       if err != nil {
                return nil, err
        }
-       return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil
+       return &databasev1.MeasureRegistryServiceUpdateResponse{
+               ModRevision: modRevision,
+       }, nil
 }
 
 func (rs *measureRegistryServer) Delete(ctx context.Context, req 
*databasev1.MeasureRegistryServiceDeleteRequest) (
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index e4cc1ed1..d188f975 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -28,6 +28,8 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/accesslog"
@@ -56,8 +58,8 @@ func (s *streamService) activeIngestionAccessLog(root string) 
(err error) {
 }
 
 func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
-       reply := func(stream streamv1.StreamService_WriteServer, logger 
*logger.Logger) {
-               if errResp := stream.Send(&streamv1.WriteResponse{}); errResp 
!= nil {
+       reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
messageId uint64, stream streamv1.StreamService_WriteServer, logger 
*logger.Logger) {
+               if errResp := stream.Send(&streamv1.WriteResponse{Metadata: 
metadata, Status: status, MessageId: messageId}); errResp != nil {
                        logger.Err(errResp).Msg("failed to send response")
                }
        }
@@ -76,18 +78,28 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                }
                if err != nil {
                        s.sampled.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to receive message")
-                       reply(stream, s.sampled)
-                       continue
+                       return err
                }
                if errTime := 
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
                        s.sampled.Error().Stringer("written", 
writeEntity).Err(errTime).Msg("the element time is invalid")
-                       reply(stream, s.sampled)
+                       reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream, s.sampled)
+                       continue
+               }
+               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+               if !existed {
+                       s.sampled.Error().Err(err).Stringer("written", 
writeEntity).Msg("failed to stream schema not found")
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
+                       continue
+               }
+               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
+                       s.sampled.Error().Stringer("written", 
writeEntity).Msg("the stream schema is expired")
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, 
s.sampled)
                        continue
                }
                entity, tagValues, shardID, err := 
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
                if err != nil {
                        s.sampled.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
-                       reply(stream, s.sampled)
+                       reply(nil, modelv1.Status_STATUS_INTERNAL_ERROR, 
writeEntity.GetMessageId(), stream, s.sampled)
                        continue
                }
                if s.ingestionAccessLog != nil {
@@ -108,7 +120,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                if errWritePub != nil {
                        s.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to send a message")
                }
-               reply(stream, s.sampled)
+               reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeEntity.GetMessageId(), stream, s.sampled)
        }
 }
 
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index fe516512..15780b54 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -182,7 +182,8 @@ func (t *topNStreamingProcessor) writeData(eventTime 
time.Time, timeBucket strin
        measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
        iwr := &measurev1.InternalWriteRequest{
                Request: &measurev1.WriteRequest{
-                       Metadata: t.topNSchema.GetMetadata(),
+                       MessageId: uint64(time.Now().UnixNano()),
+                       Metadata:  t.topNSchema.GetMetadata(),
                        DataPoint: &measurev1.DataPointValue{
                                Timestamp: timestamppb.New(eventTime),
                                TagFamilies: []*modelv1.TagFamilyForWrite{
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 3235b5c6..5384480f 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -136,6 +136,7 @@ func (s *measure) write(shardID common.ShardID, entity 
[]byte, entityValues tsdb
                        Request: &measurev1.WriteRequest{
                                Metadata:  s.GetSchema().Metadata,
                                DataPoint: value,
+                               MessageId: uint64(time.Now().UnixNano()),
                        },
                        EntityValues: entityValues[1:],
                })
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 26a6b648..053abee3 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -219,7 +219,7 @@ func createOrUpdateTopNMeasure(ctx context.Context, 
measureSchemaRegistry schema
                Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
        }
        if oldTopNSchema == nil {
-               if innerErr := measureSchemaRegistry.CreateMeasure(ctx, 
newTopNMeasure); innerErr != nil {
+               if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, 
newTopNMeasure); innerErr != nil {
                        return nil, innerErr
                }
                return newTopNMeasure, nil
@@ -233,7 +233,7 @@ func createOrUpdateTopNMeasure(ctx context.Context, 
measureSchemaRegistry schema
                return oldTopNSchema, nil
        }
        // update
-       if err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err 
!= nil {
+       if _, err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); 
err != nil {
                return nil, err
        }
        return newTopNMeasure, nil
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index f6cda0f1..808a123d 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -122,7 +122,9 @@ var _ = Describe("Metadata", func() {
                                measureSchema.Entity.TagNames = 
measureSchema.Entity.TagNames[1:]
                                entitySize := len(measureSchema.Entity.TagNames)
 
-                               
Expect(svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), 
measureSchema)).Should(Succeed())
+                               modRevision, err := 
svcs.metadataService.MeasureRegistry().UpdateMeasure(context.TODO(), 
measureSchema)
+                               Expect(modRevision).ShouldNot(BeZero())
+                               Expect(err).ShouldNot(HaveOccurred())
 
                                Eventually(func() bool {
                                        val, err := 
svcs.measure.Measure(&commonv1.Metadata{
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index f53a718c..d06d865e 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -185,88 +185,92 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key 
string, message proto.
 // update will first ensure the existence of the entity with the metadata,
 // and overwrite the existing value if so.
 // Otherwise, it will return ErrGRPCResourceNotFound.
-func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) 
error {
+func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) 
(int64, error) {
        if !e.closer.AddRunning() {
-               return ErrClosed
+               return 0, ErrClosed
        }
        defer e.closer.Done()
        key, err := metadata.key()
        if err != nil {
-               return err
+               return 0, err
        }
        key = e.prependNamespace(key)
        getResp, err := e.client.Get(ctx, key)
        if err != nil {
-               return err
+               return 0, err
        }
        if getResp.Count > 1 {
-               return errUnexpectedNumberOfEntities
+               return 0, errUnexpectedNumberOfEntities
        }
        val, err := proto.Marshal(metadata.Spec.(proto.Message))
        if err != nil {
-               return err
+               return 0, err
        }
        replace := getResp.Count > 0
-       if replace {
-               existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0])
-               if innerErr != nil {
-                       return innerErr
-               }
-               // directly return if we have the same entity
-               if metadata.equal(existingVal) {
-                       return nil
-               }
+       if !replace {
+               return 0, ErrGRPCResourceNotFound
+       }
+       existingVal, innerErr := metadata.Kind.Unmarshal(getResp.Kvs[0])
+       if innerErr != nil {
+               return 0, innerErr
+       }
+       // directly return if we have the same entity
+       if metadata.equal(existingVal) {
+               return 0, nil
+       }
 
-               modRevision := getResp.Kvs[0].ModRevision
-               txnResp, txnErr := e.client.Txn(ctx).
-                       If(clientv3.Compare(clientv3.ModRevision(key), "=", 
modRevision)).
-                       Then(clientv3.OpPut(key, string(val))).
-                       Commit()
-               if txnErr != nil {
-                       return txnErr
-               }
-               if !txnResp.Succeeded {
-                       return errConcurrentModification
-               }
-       } else {
-               return ErrGRPCResourceNotFound
+       modRevision := metadata.ModRevision
+       if modRevision == 0 {
+               modRevision = getResp.Kvs[0].ModRevision
        }
-       return nil
+       txnResp, txnErr := e.client.Txn(ctx).
+               If(clientv3.Compare(clientv3.ModRevision(key), "=", 
modRevision)).
+               Then(clientv3.OpPut(key, string(val))).
+               Commit()
+       if txnErr != nil {
+               return 0, txnErr
+       }
+       if !txnResp.Succeeded {
+               return 0, errConcurrentModification
+       }
+
+       return txnResp.Responses[0].GetResponsePut().Header.Revision, nil
 }
 
 // create will first check existence of the entity with the metadata,
 // and put the value if it does not exist.
 // Otherwise, it will return ErrGRPCAlreadyExists.
-func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) 
error {
+func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) 
(int64, error) {
        if !e.closer.AddRunning() {
-               return ErrClosed
+               return 0, ErrClosed
        }
        defer e.closer.Done()
        key, err := metadata.key()
        if err != nil {
-               return err
+               return 0, err
        }
        key = e.prependNamespace(key)
        getResp, err := e.client.Get(ctx, key)
        if err != nil {
-               return err
+               return 0, err
        }
        if getResp.Count > 1 {
-               return errUnexpectedNumberOfEntities
+               return 0, errUnexpectedNumberOfEntities
        }
        val, err := proto.Marshal(metadata.Spec.(proto.Message))
        if err != nil {
-               return err
+               return 0, err
        }
        replace := getResp.Count > 0
        if replace {
-               return errGRPCAlreadyExists
+               return 0, errGRPCAlreadyExists
        }
-       _, err = e.client.Put(ctx, key, string(val))
+       putResp, err := e.client.Put(ctx, key, string(val))
        if err != nil {
-               return err
+               return 0, err
        }
-       return nil
+
+       return putResp.Header.Revision, nil
 }
 
 func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix 
string, kind Kind) ([]proto.Message, error) {
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index fc98664f..e4fc3913 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -77,7 +77,7 @@ func preloadSchema(e Registry) error {
        if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
                return err
        }
-       err := e.CreateStream(context.Background(), s)
+       _, err := e.CreateStream(context.Background(), s)
        if err != nil {
                return err
        }
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 5afa54d2..0ef85e59 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -76,23 +76,25 @@ func (e *etcdSchemaRegistry) CreateGroup(ctx 
context.Context, group *commonv1.Gr
        if group.UpdatedAt != nil {
                group.UpdatedAt = timestamppb.Now()
        }
-       return e.create(ctx, Metadata{
+       _, err := e.create(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind: KindGroup,
                        Name: group.GetMetadata().GetName(),
                },
                Spec: group,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group 
*commonv1.Group) error {
-       return e.update(ctx, Metadata{
+       _, err := e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind: KindGroup,
                        Name: group.GetMetadata().GetName(),
                },
                Spec: group,
        })
+       return err
 }
 
 func formatGroupKey(group string) string {
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index 41ff541b..41b1b284 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -59,7 +59,7 @@ func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx 
context.Context, indexRu
        if indexRuleBinding.UpdatedAt != nil {
                indexRuleBinding.UpdatedAt = timestamppb.Now()
        }
-       return e.create(ctx, Metadata{
+       _, err := e.create(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindIndexRuleBinding,
                        Name:  indexRuleBinding.GetMetadata().GetName(),
@@ -67,10 +67,11 @@ func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx 
context.Context, indexRu
                },
                Spec: indexRuleBinding,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, 
indexRuleBinding *databasev1.IndexRuleBinding) error {
-       return e.update(ctx, Metadata{
+       _, err := e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindIndexRuleBinding,
                        Name:  indexRuleBinding.GetMetadata().GetName(),
@@ -78,6 +79,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx 
context.Context, indexRu
                },
                Spec: indexRuleBinding,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) DeleteIndexRuleBinding(ctx context.Context, 
metadata *commonv1.Metadata) (bool, error) {
@@ -129,7 +131,7 @@ func (e *etcdSchemaRegistry) CreateIndexRule(ctx 
context.Context, indexRule *dat
                buf = append(buf, indexRule.Metadata.Name...)
                indexRule.Metadata.Id = crc32.ChecksumIEEE(buf)
        }
-       return e.create(ctx, Metadata{
+       _, err := e.create(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindIndexRule,
                        Name:  indexRule.GetMetadata().GetName(),
@@ -137,6 +139,7 @@ func (e *etcdSchemaRegistry) CreateIndexRule(ctx 
context.Context, indexRule *dat
                },
                Spec: indexRule,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule 
*databasev1.IndexRule) error {
@@ -147,7 +150,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRule(ctx 
context.Context, indexRule *dat
                }
                indexRule.Metadata.Id = existingIndexRule.Metadata.Id
        }
-       return e.update(ctx, Metadata{
+       _, err := e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindIndexRule,
                        Name:  indexRule.GetMetadata().GetName(),
@@ -155,6 +158,7 @@ func (e *etcdSchemaRegistry) UpdateIndexRule(ctx 
context.Context, indexRule *dat
                },
                Spec: indexRule,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) DeleteIndexRule(ctx context.Context, metadata 
*commonv1.Metadata) (bool, error) {
diff --git a/banyand/metadata/schema/measure.go 
b/banyand/metadata/schema/measure.go
index 5b0b6f50..a17f8fcd 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -56,13 +56,13 @@ func (e *etcdSchemaRegistry) ListMeasure(ctx 
context.Context, opt ListOpt) ([]*d
        return entities, nil
 }
 
-func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure 
*databasev1.Measure) error {
+func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure 
*databasev1.Measure) (int64, error) {
        if measure.UpdatedAt != nil {
                measure.UpdatedAt = timestamppb.Now()
        }
        if measure.GetInterval() != "" {
                if _, err := timestamp.ParseDuration(measure.GetInterval()); 
err != nil {
-                       return errors.Wrap(err, "interval is malformed")
+                       return 0, errors.Wrap(err, "interval is malformed")
                }
        }
        return e.create(ctx, Metadata{
@@ -75,17 +75,18 @@ func (e *etcdSchemaRegistry) CreateMeasure(ctx 
context.Context, measure *databas
        })
 }
 
-func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure 
*databasev1.Measure) error {
+func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure 
*databasev1.Measure) (int64, error) {
        if measure.GetInterval() != "" {
                if _, err := timestamp.ParseDuration(measure.GetInterval()); 
err != nil {
-                       return errors.Wrap(err, "interval is malformed")
+                       return 0, errors.Wrap(err, "interval is malformed")
                }
        }
        return e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
-                       Kind:  KindMeasure,
-                       Group: measure.GetMetadata().GetGroup(),
-                       Name:  measure.GetMetadata().GetName(),
+                       Kind:        KindMeasure,
+                       Group:       measure.GetMetadata().GetGroup(),
+                       Name:        measure.GetMetadata().GetName(),
+                       ModRevision: measure.GetMetadata().GetModRevision(),
                },
                Spec: measure,
        })
diff --git a/banyand/metadata/schema/property.go 
b/banyand/metadata/schema/property.go
index bb76f0a8..1456199a 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -101,7 +101,7 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx 
context.Context, property *proper
                Spec: property,
        }
        tagsNum := uint32(len(property.Tags))
-       err := e.create(ctx, md)
+       _, err := e.create(ctx, md)
        if err == nil {
                return true, tagsNum, nil
        }
@@ -125,7 +125,7 @@ func (e *etcdSchemaRegistry) ApplyProperty(ctx 
context.Context, property *proper
                existed.Tags = append(existed.Tags, property.Tags...)
                md.Spec = existed
        }
-       if err = e.update(ctx, md); err != nil {
+       if _, err = e.update(ctx, md); err != nil {
                return false, 0, err
        }
        return false, tagsNum, nil
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index d5dc0185..6806b1a5 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -60,9 +60,10 @@ type Registry interface {
 
 // TypeMeta defines the identity and type of an Event.
 type TypeMeta struct {
-       Name  string
-       Group string
-       Kind  Kind
+       Name        string
+       Group       string
+       ModRevision int64
+       Kind        Kind
 }
 
 // Metadata wrap dedicated serialized resource and its TypeMeta.
@@ -136,8 +137,8 @@ func (m Metadata) equal(other Metadata) bool {
 type Stream interface {
        GetStream(ctx context.Context, metadata *commonv1.Metadata) 
(*databasev1.Stream, error)
        ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, 
error)
-       CreateStream(ctx context.Context, stream *databasev1.Stream) error
-       UpdateStream(ctx context.Context, stream *databasev1.Stream) error
+       CreateStream(ctx context.Context, stream *databasev1.Stream) (int64, 
error)
+       UpdateStream(ctx context.Context, stream *databasev1.Stream) (int64, 
error)
        DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, 
error)
 }
 
@@ -163,8 +164,8 @@ type IndexRuleBinding interface {
 type Measure interface {
        GetMeasure(ctx context.Context, metadata *commonv1.Metadata) 
(*databasev1.Measure, error)
        ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, 
error)
-       CreateMeasure(ctx context.Context, measure *databasev1.Measure) error
-       UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error
+       CreateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, 
error)
+       UpdateMeasure(ctx context.Context, measure *databasev1.Measure) (int64, 
error)
        DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, 
error)
        TopNAggregations(ctx context.Context, metadata *commonv1.Metadata) 
([]*databasev1.TopNAggregation, error)
 }
diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go
index 8f1c7a42..c6499931 100644
--- a/banyand/metadata/schema/shard.go
+++ b/banyand/metadata/schema/shard.go
@@ -41,14 +41,15 @@ func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx 
context.Context, shard *dat
                },
                Spec: shard,
        }
-       err := e.update(ctx, md)
+       _, err := e.update(ctx, md)
        if err == nil {
                return nil
        }
        if errors.Is(err, ErrGRPCResourceNotFound) {
                shard.CreatedAt = shard.UpdatedAt
                md.Spec = shard
-               return e.create(ctx, md)
+               _, err = e.create(ctx, md)
+               return err
        }
        return err
 }
diff --git a/banyand/metadata/schema/stream.go 
b/banyand/metadata/schema/stream.go
index d2e2491f..645a635d 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -51,25 +51,26 @@ func (e *etcdSchemaRegistry) ListStream(ctx 
context.Context, opt ListOpt) ([]*da
        return entities, nil
 }
 
-func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream 
*databasev1.Stream) error {
+func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream 
*databasev1.Stream) (int64, error) {
        return e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
-                       Kind:  KindStream,
-                       Group: stream.GetMetadata().GetGroup(),
-                       Name:  stream.GetMetadata().GetName(),
+                       Kind:        KindStream,
+                       Group:       stream.GetMetadata().GetGroup(),
+                       Name:        stream.GetMetadata().GetName(),
+                       ModRevision: stream.GetMetadata().GetModRevision(),
                },
                Spec: stream,
        })
 }
 
-func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream 
*databasev1.Stream) error {
+func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream 
*databasev1.Stream) (int64, error) {
        if stream.UpdatedAt != nil {
                stream.UpdatedAt = timestamppb.Now()
        }
        group := stream.Metadata.GetGroup()
        _, err := e.GetGroup(ctx, group)
        if err != nil {
-               return err
+               return 0, err
        }
        return e.create(ctx, Metadata{
                TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go
index 858fdf04..60597f1a 100644
--- a/banyand/metadata/schema/topn.go
+++ b/banyand/metadata/schema/topn.go
@@ -55,7 +55,7 @@ func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx 
context.Context, topNAggr
        if topNAggregation.UpdatedAt != nil {
                topNAggregation.UpdatedAt = timestamppb.Now()
        }
-       return e.create(ctx, Metadata{
+       _, err := e.create(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindTopNAggregation,
                        Group: topNAggregation.GetMetadata().GetGroup(),
@@ -63,10 +63,11 @@ func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx 
context.Context, topNAggr
                },
                Spec: topNAggregation,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx context.Context, 
topNAggregation *databasev1.TopNAggregation) error {
-       return e.update(ctx, Metadata{
+       _, err := e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindTopNAggregation,
                        Group: topNAggregation.GetMetadata().GetGroup(),
@@ -74,6 +75,7 @@ func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx 
context.Context, topNAggr
                },
                Spec: topNAggregation,
        })
+       return err
 }
 
 func (e *etcdSchemaRegistry) DeleteTopNAggregation(ctx context.Context, 
metadata *commonv1.Metadata) (bool, error) {
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index a796c4bb..cc5b2f45 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -118,13 +118,15 @@ var _ = ginkgo.Describe("Watcher", func() {
                        },
                })
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               var modRevision int64
                for i := 0; i < 2; i++ {
-                       err = registry.CreateMeasure(context.Background(), 
&databasev1.Measure{
+                       modRevision, err = 
registry.CreateMeasure(context.Background(), &databasev1.Measure{
                                Metadata: &commonv1.Metadata{
                                        Name:  fmt.Sprintf("testkey%d", i+1),
                                        Group: "testgroup-measure",
                                },
                        })
+                       gomega.Expect(modRevision).ShouldNot(gomega.BeZero())
                        gomega.Expect(err).NotTo(gomega.HaveOccurred())
                }
 
@@ -156,12 +158,14 @@ var _ = ginkgo.Describe("Watcher", func() {
                        },
                })
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
-               err = registry.CreateStream(context.Background(), 
&databasev1.Stream{
+               var modRevision int64
+               modRevision, err = registry.CreateStream(context.Background(), 
&databasev1.Stream{
                        Metadata: &commonv1.Metadata{
                                Name:  "testkey",
                                Group: "testgroup-stream",
                        },
                })
+               gomega.Expect(modRevision).ShouldNot(gomega.BeZero())
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                ok, err := registry.DeleteStream(context.Background(), 
&commonv1.Metadata{
                        Name:  "testkey",
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index 8c4f3bfb..f0133ea5 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -122,7 +122,9 @@ var _ = Describe("Metadata", func() {
                                streamSchema.Entity.TagNames = 
streamSchema.Entity.TagNames[1:]
                                entitySize := len(streamSchema.Entity.TagNames)
 
-                               
Expect(svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), 
streamSchema)).Should(Succeed())
+                               modRevision, err := 
svcs.metadataService.StreamRegistry().UpdateStream(context.TODO(), streamSchema)
+                               Expect(modRevision).ShouldNot(BeZero())
+                               Expect(err).ShouldNot(HaveOccurred())
 
                                Eventually(func() bool {
                                        val, ok := 
svcs.stream.schemaRepo.loadStream(&commonv1.Metadata{
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 96379185..0ac80eac 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -168,6 +168,9 @@
     - [TopNRequest](#banyandb-measure-v1-TopNRequest)
     - [TopNResponse](#banyandb-measure-v1-TopNResponse)
   
+- [banyandb/model/v1/write.proto](#banyandb_model_v1_write-proto)
+    - [Status](#banyandb-model-v1-Status)
+  
 - [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto)
     - [DataPointValue](#banyandb-measure-v1-DataPointValue)
     - [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest)
@@ -1671,6 +1674,11 @@ Type determine the index structure under the hood
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) |  |  |
+
+
 
 
 
@@ -1817,6 +1825,11 @@ Type determine the index structure under the hood
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) |  |  |
+
+
 
 
 
@@ -1842,6 +1855,11 @@ Type determine the index structure under the hood
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) |  |  |
+
+
 
 
 
@@ -1988,6 +2006,11 @@ Type determine the index structure under the hood
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| mod_revision | [int64](#int64) |  |  |
+
+
 
 
 
@@ -2498,6 +2521,38 @@ TopNResponse is the response for a query to the Query 
module.
 
 
 
+<a name="banyandb_model_v1_write-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/model/v1/write.proto
+
+
+ 
+
+
+<a name="banyandb-model-v1-Status"></a>
+
+### Status
+Status is the response status for write
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| STATUS_UNSPECIFIED | 0 |  |
+| STATUS_SUCCEED | 1 |  |
+| STATUS_INVALID_TIMESTAMP | 2 |  |
+| STATUS_NOT_FOUND | 3 |  |
+| STATUS_EXPIRED_SCHEMA | 4 |  |
+| STATUS_INTERNAL_ERROR | 5 |  |
+
+
+ 
+
+ 
+
+ 
+
+
+
 <a name="banyandb_measure_v1_write-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
@@ -2550,6 +2605,7 @@ WriteRequest is the request contract for write
 | ----- | ---- | ----- | ----------- |
 | metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is required. |
 | data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) |  | the 
data_point is required. |
+| message_id | [uint64](#uint64) |  | the message_id is required. |
 
 
 
@@ -2562,6 +2618,13 @@ WriteRequest is the request contract for write
 WriteResponse is the response contract for write
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| message_id | [uint64](#uint64) |  | the message_id from request. |
+| status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) |  | status 
indicates the request processing result |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata from request when request fails |
+
+
 
 
 
@@ -2942,8 +3005,9 @@ QueryResponse is the response for a query to the Query 
module.
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is only required in the first write. |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is required. |
 | element | [ElementValue](#banyandb-stream-v1-ElementValue) |  | the element 
is required. |
+| message_id | [uint64](#uint64) |  | the message_id is required. |
 
 
 
@@ -2956,6 +3020,13 @@ QueryResponse is the response for a query to the Query 
module.
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| message_id | [uint64](#uint64) |  | the message_id from request. |
+| status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) |  | status 
indicates the request processing result |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata from request when request fails |
+
+
 
 
 
diff --git a/pkg/partition/entity.go b/pkg/partition/entity.go
index f5a86100..22433fc6 100644
--- a/pkg/partition/entity.go
+++ b/pkg/partition/entity.go
@@ -33,7 +33,10 @@ import (
 var ErrMalformedElement = errors.New("element is malformed")
 
 // EntityLocator combines several TagLocators that help find the entity value.
-type EntityLocator []TagLocator
+type EntityLocator struct {
+       TagLocators []TagLocator
+       ModRevision int64
+}
 
 // TagLocator contains offsets to retrieve a tag swiftly.
 type TagLocator struct {
@@ -42,22 +45,22 @@ type TagLocator struct {
 }
 
 // NewEntityLocator return a EntityLocator based on tag family spec and entity 
spec.
-func NewEntityLocator(families []*databasev1.TagFamilySpec, entity 
*databasev1.Entity) EntityLocator {
-       locator := make(EntityLocator, 0, len(entity.GetTagNames()))
+func NewEntityLocator(families []*databasev1.TagFamilySpec, entity 
*databasev1.Entity, modRevision int64) EntityLocator {
+       locator := make([]TagLocator, 0, len(entity.GetTagNames()))
        for _, tagInEntity := range entity.GetTagNames() {
                fIndex, tIndex, tag := pbv1.FindTagByName(families, tagInEntity)
                if tag != nil {
                        locator = append(locator, TagLocator{FamilyOffset: 
fIndex, TagOffset: tIndex})
                }
        }
-       return locator
+       return EntityLocator{TagLocators: locator, ModRevision: modRevision}
 }
 
 // Find the entity from a tag family, prepend a subject to the entity.
 func (e EntityLocator) Find(subject string, value 
[]*modelv1.TagFamilyForWrite) (tsdb.Entity, tsdb.EntityValues, error) {
-       entityValues := make(tsdb.EntityValues, len(e)+1)
+       entityValues := make(tsdb.EntityValues, len(e.TagLocators)+1)
        entityValues[0] = tsdb.StrValue(subject)
-       for i, index := range e {
+       for i, index := range e.TagLocators {
                tag, err := GetTagByOffset(value, index.FamilyOffset, 
index.TagOffset)
                if err != nil {
                        return nil, nil, err
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 4297c296..9d197df4 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -39,7 +39,6 @@ var (
        errUnsupportedConditionValue = errors.New("unsupported condition value 
type")
        errInvalidCriteriaType       = errors.New("invalid criteria type")
        errIndexNotDefined           = errors.New("index is not define for the 
tag")
-       errInvalidData               = errors.New("data is invalid")
 
        nullTag = &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
 )
@@ -60,22 +59,30 @@ func ProjectItem(ec executor.ExecutionContext, item 
tsdb.Item, projectionFieldRe
                if len(refs) == 0 {
                        continue
                }
-               tags := make([]*modelv1.Tag, len(refs))
                familyName := refs[0].Tag.getFamilyName()
                parsedTagFamily, err := ec.ParseTagFamily(familyName, item)
                if err != nil {
                        return nil, errors.WithMessage(err, "parse projection")
                }
-               if len(refs) > len(parsedTagFamily.Tags) {
-                       return nil, errors.Wrapf(errInvalidData,
-                               "the number of tags %d in %s is less then 
expected %d",
-                               len(parsedTagFamily.Tags), familyName, 
len(refs))
+
+               parsedTagSize := len(parsedTagFamily.GetTags())
+               tagRefSize := len(refs)
+
+               // Determine maximum size for creating the tags slice
+               maxSize := tagRefSize
+               if parsedTagSize < tagRefSize {
+                       maxSize = parsedTagSize
                }
+
+               tags := make([]*modelv1.Tag, maxSize)
+
                for j, ref := range refs {
-                       if len(parsedTagFamily.GetTags()) > ref.Spec.TagIdx {
+                       if parsedTagSize > ref.Spec.TagIdx {
                                tags[j] = 
parsedTagFamily.GetTags()[ref.Spec.TagIdx]
-                       } else {
+                       } else if j < parsedTagSize {
                                tags[j] = &modelv1.Tag{Key: ref.Tag.name, 
Value: nullTag}
+                       } else {
+                               break
                        }
                }
 
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index 18e48846..e647cf9f 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -51,7 +51,8 @@ func PreloadSchema(ctx context.Context, e schema.Registry) 
error {
                return errors.WithStack(err)
        }
        if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure 
*databasev1.Measure) error {
-               return e.CreateMeasure(ctx, measure)
+               _, innerErr := e.CreateMeasure(ctx, measure)
+               return innerErr
        }); err != nil {
                return errors.WithStack(err)
        }
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index e7557c31..b796ef01 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -57,7 +57,7 @@ func PreloadSchema(ctx context.Context, e schema.Registry) 
error {
        if unmarshalErr := protojson.Unmarshal([]byte(streamJSON), s); 
unmarshalErr != nil {
                return unmarshalErr
        }
-       if innerErr := e.CreateStream(ctx, s); innerErr != nil {
+       if _, innerErr := e.CreateStream(ctx, s); innerErr != nil {
                return innerErr
        }
 
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 04316efe..8405152f 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -35,6 +35,7 @@ import (
        "sigs.k8s.io/yaml"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
@@ -105,7 +106,7 @@ func loadData(md *commonv1.Metadata, measure 
measurev1.MeasureService_WriteClien
                dataPointValue := &measurev1.DataPointValue{}
                gm.Expect(protojson.Unmarshal(rawDataPointValue, 
dataPointValue)).ShouldNot(gm.HaveOccurred())
                dataPointValue.Timestamp = 
timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
-               gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, 
DataPoint: dataPointValue})).
+               gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, 
DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})).
                        Should(gm.Succeed())
        }
 }
@@ -114,14 +115,21 @@ func loadData(md *commonv1.Metadata, measure 
measurev1.MeasureService_WriteClien
 func Write(conn *grpclib.ClientConn, name, group, dataFile string,
        baseTime time.Time, interval time.Duration,
 ) {
+       metadata := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+
+       schema := databasev1.NewMeasureRegistryServiceClient(conn)
+       resp, err := schema.Get(context.Background(), 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: metadata})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       metadata = resp.GetMeasure().GetMetadata()
+
        c := measurev1.NewMeasureServiceClient(conn)
        ctx := context.Background()
        writeClient, err := c.Write(ctx)
        gm.Expect(err).NotTo(gm.HaveOccurred())
-       loadData(&commonv1.Metadata{
-               Name:  name,
-               Group: group,
-       }, writeClient, dataFile, baseTime, interval)
+       loadData(metadata, writeClient, dataFile, baseTime, interval)
        gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
        gm.Eventually(func() error {
                _, err := writeClient.Recv()
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index cb6cbdee..4e48d18c 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -37,6 +37,7 @@ import (
        "sigs.k8s.io/yaml"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
@@ -97,12 +98,13 @@ var VerifyFn = func(innerGm gm.Gomega, sharedContext 
helpers.SharedContext, args
                })
 }
 
-func loadData(stream streamv1.StreamService_WriteClient, dataFile string, 
baseTime time.Time, interval time.Duration) {
+func loadData(stream streamv1.StreamService_WriteClient, metadata 
*commonv1.Metadata, dataFile string, baseTime time.Time, interval 
time.Duration) {
        var templates []interface{}
        content, err := dataFS.ReadFile("testdata/" + dataFile)
        gm.Expect(err).ShouldNot(gm.HaveOccurred())
        gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
        bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+
        for i, template := range templates {
                rawSearchTagFamily, errMarshal := json.Marshal(template)
                gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
@@ -125,11 +127,9 @@ func loadData(stream streamv1.StreamService_WriteClient, 
dataFile string, baseTi
                }
                e.TagFamilies = append(e.TagFamilies, searchTagFamily)
                errInner := stream.Send(&streamv1.WriteRequest{
-                       Metadata: &commonv1.Metadata{
-                               Name:  "sw",
-                               Group: "default",
-                       },
-                       Element: e,
+                       Metadata:  metadata,
+                       Element:   e,
+                       MessageId: uint64(time.Now().UnixNano()),
                })
                gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
        }
@@ -137,11 +137,20 @@ func loadData(stream streamv1.StreamService_WriteClient, 
dataFile string, baseTi
 
 // Write data into the server.
 func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, 
interval time.Duration) {
+       metadata := &commonv1.Metadata{
+               Name:  "sw",
+               Group: "default",
+       }
+       schema := databasev1.NewStreamRegistryServiceClient(conn)
+       resp, err := schema.Get(context.Background(), 
&databasev1.StreamRegistryServiceGetRequest{Metadata: metadata})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       metadata = resp.GetStream().GetMetadata()
+
        c := streamv1.NewStreamServiceClient(conn)
        ctx := context.Background()
        writeClient, err := c.Write(ctx)
        gm.Expect(err).NotTo(gm.HaveOccurred())
-       loadData(writeClient, dataFile, baseTime, interval)
+       loadData(writeClient, metadata, dataFile, baseTime, interval)
        gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
        gm.Eventually(func() error {
                _, err := writeClient.Recv()
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index adcd5c08..98f0a428 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -33,7 +33,7 @@ services:
       - sw_agent:/skywalking-java-agent
 
   oap:
-    image: "ghcr.io/apache/skywalking/oap:${SW_OAP_COMMIT}"
+    image: 
"ghcr.io/apache/skywalking-banyandb-java-client/oap:${SW_OAP_COMMIT}"
     expose:
       - 11800
       - 12800
@@ -55,7 +55,7 @@ services:
       retries: 120
 
   ui:
-    image: "ghcr.io/apache/skywalking/ui:${SW_OAP_COMMIT}"
+    image: "ghcr.io/apache/skywalking-banyandb-java-client/ui:${SW_OAP_COMMIT}"
     expose:
       - 8080
     environment:
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 57dc048e..c1d3d0a0 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -25,5 +25,5 @@ 
SW_KUBERNETES_COMMIT_SHA=e2c61c6774cf377b23516fca6f8a1e119d3191c5
 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
 SW_CTL_COMMIT=6b2eb0011e38b630db6af7203db215806bd141ed
 
-SW_OAP_COMMIT=5ad74eb2619b4cebe57a150ca328b2c1d052cb54
+SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e
 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
diff --git a/test/stress/cases/istio/istio_suite_test.go 
b/test/stress/cases/istio/istio_suite_test.go
index 88adad25..a2a2a5b6 100644
--- a/test/stress/cases/istio/istio_suite_test.go
+++ b/test/stress/cases/istio/istio_suite_test.go
@@ -150,6 +150,7 @@ func ReadAndWriteFromFile(filePath string, conn 
*grpc.ClientConn) error {
                                return fmt.Errorf("failed to unmarshal JSON 
message: %w", errUnmarshal)
                        }
 
+                       req.MessageId = uint64(time.Now().UnixNano())
                        req.DataPoint.Timestamp = 
timestamppb.New(adjustTime(req.DataPoint.Timestamp.AsTime()))
                        // Write the request to the measureService
                        if errSend := client.Send(&req); errSend != nil {
diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go
index 6f5fe011..6865ba60 100644
--- a/test/stress/cases/istio/repo.go
+++ b/test/stress/cases/istio/repo.go
@@ -147,7 +147,8 @@ func (p *preloadService) PreRun(_ context.Context) error {
                return errors.WithStack(err)
        }
        if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure 
*databasev1.Measure) error {
-               return e.CreateMeasure(context.TODO(), measure)
+               _, innerErr := e.CreateMeasure(context.TODO(), measure)
+               return innerErr
        }); err != nil {
                return errors.WithStack(err)
        }
diff --git a/test/stress/env b/test/stress/env
index 1ff35800..91076f83 100644
--- a/test/stress/env
+++ b/test/stress/env
@@ -26,7 +26,7 @@ 
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
 SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
 SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
 
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
+SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e
 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
 
 TARGET=test
diff --git a/test/stress/env.dev b/test/stress/env.dev
index 604ded57..febe0637 100644
--- a/test/stress/env.dev
+++ b/test/stress/env.dev
@@ -25,7 +25,7 @@ 
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
 SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
 SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
 
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
+SW_OAP_COMMIT=bb830ee6d3c992aca128eeaf2f8841d2776faf2e
 SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
 
 TARGET=dev
diff --git a/ui/src/components/Editor/index.vue 
b/ui/src/components/Editor/index.vue
index 5c4851a3..4f5a4daa 100644
--- a/ui/src/components/Editor/index.vue
+++ b/ui/src/components/Editor/index.vue
@@ -55,6 +55,7 @@ const data = reactive({
     form: {
         group: route.params.group,
         name: route.params.group,
+        modRevision: route.params.modRevision,
         interval: 1,
         intervalUnit: 'ns'
     }
@@ -98,6 +99,7 @@ const options = [
 watch(() => route, () => {
     data.form.group = route.params.group
     data.form.name = route.params.name
+    data.form.modRevision = route.params.modRevision
     data.type = route.params.type + ''
     data.operator = route.params.operator
     initData()
@@ -149,7 +151,8 @@ const submit = async (formEl: FormInstance | undefined) => {
             const form = {
                 metadata: {
                     group: data.form.group,
-                    name: data.form.name
+                    name: data.form.name,
+                    modRevision: data.form.modRevision
                 },
                 tagFamilies: tagFamilies,
                 entity: {
@@ -262,6 +265,7 @@ function initData() {
                         data.form.intervalUnit = intervalUnit
                         fieldEditorRef.value.setFields(fields)
                     }
+                    data.form.modRevision = res.data[data.type + 
''].metadata.modRevision
                 }
             })
             .finally(() => {

Reply via email to