This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ccbfd1f6 Support writing stream and trace data with specifications 
(#888)
ccbfd1f6 is described below

commit ccbfd1f6ac4b2c342362dae8b524a0f2b9a80cd7
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Dec 12 09:42:16 2025 +0800

    Support writing stream and trace data with specifications (#888)
    
    * Support writing stream and trace data with specifications
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 CHANGES.md                                         |   1 +
 banyand/liaison/grpc/discovery.go                  |  67 +++--
 banyand/liaison/grpc/locator.go                    | 120 +++++++++
 banyand/liaison/grpc/measure.go                    | 183 ++++---------
 banyand/liaison/grpc/stream.go                     | 167 ++++++++----
 banyand/liaison/grpc/trace.go                      | 289 ++++++++++++++-------
 banyand/metadata/schema/etcd.go                    |   8 +-
 banyand/stream/write_liaison.go                    |  24 +-
 banyand/stream/write_standalone.go                 |  99 +++++--
 banyand/trace/write_liaison.go                     |  31 ++-
 banyand/trace/write_standalone.go                  | 126 +++++----
 pkg/test/stream/testdata/group.json                |  21 +-
 pkg/test/stream/testdata/group_with_stages.json    |  33 +++
 .../testdata/index_rule_bindings/sw_spec.json      |  27 ++
 pkg/test/stream/testdata/streams/sw_spec.json      |  95 +++++++
 .../trace/testdata/groups/test-trace-spec.json     |  19 ++
 .../testdata/groups_stages/test-trace-spec.json    |  34 +++
 .../testdata/index_rule_bindings/sw_spec.json      |  14 +
 .../trace/testdata/index_rules/duration_spec.json  |  14 +
 .../trace/testdata/index_rules/timestamp_spec.json |  14 +
 pkg/test/trace/testdata/traces/sw_spec.json        |  45 ++++
 test/cases/init.go                                 |  42 +++
 test/cases/stream/data/data.go                     |  68 +++++
 test/cases/stream/data/input/write_spec.ql         |  21 ++
 test/cases/stream/data/input/write_spec.yaml       |  31 +++
 test/cases/stream/data/testdata/sw_spec_order.json |  66 +++++
 .../cases/stream/data/testdata/sw_spec_order2.json |  45 ++++
 test/cases/stream/data/want/write_spec.yaml        |  65 +++++
 test/cases/stream/stream.go                        |   1 +
 test/cases/trace/data/data.go                      |  88 +++++++
 test/cases/trace/data/input/write_spec.ql          |  21 ++
 test/cases/trace/data/input/write_spec.yml         |  28 ++
 test/cases/trace/data/testdata/sw_spec_order.json  | 123 +++++++++
 test/cases/trace/data/testdata/sw_spec_order2.json |  83 ++++++
 test/cases/trace/data/want/write_spec.yml          |  96 +++++++
 test/cases/trace/trace.go                          |   1 +
 36 files changed, 1827 insertions(+), 383 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 2993a4e4..966470fd 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
 - Add sorted query support for the Property.
 - Update bydbQL to add sorted query support for the Property.
 - Remove the windows arch for binary and docker image.
+- Support writing data with specifications.
 
 ### Bug Fixes
 
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index abeb9f99..db6953dd 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -50,6 +50,7 @@ func newDiscoveryService(kind schema.Kind, metadataRepo 
metadata.Repo, nodeRegis
        er := &entityRepo{
                entitiesMap:     make(map[identity]partition.Locator),
                measureMap:      make(map[identity]*databasev1.Measure),
+               streamMap:       make(map[identity]*databasev1.Stream),
                traceMap:        make(map[identity]*databasev1.Trace),
                traceIDIndexMap: make(map[identity]int),
        }
@@ -83,27 +84,45 @@ func (ds *discoveryService) SetLogger(log *logger.Logger) {
        ds.shardingKeyRepo.log = log
 }
 
-func (ds *discoveryService) navigateByLocator(metadata *commonv1.Metadata, 
tagFamilies []*modelv1.TagFamilyForWrite) (pbv1.EntityValues, common.ShardID, 
error) {
+func (ds *discoveryService) navigateByLocator(metadata *commonv1.Metadata, 
tagFamilies []*modelv1.TagFamilyForWrite,
+       specEntityLocator *specLocator, specShardingKeyLocator *specLocator,
+) (pbv1.EntityValues, common.ShardID, error) {
        shardNum, existed := ds.groupRepo.shardNum(metadata.Group)
        if !existed {
                return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
        }
        id := getID(metadata)
-       entityLocator, existed := ds.entityRepo.getLocator(id)
-       if !existed {
-               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the entity locator by: %v", metadata)
-       }
-       entityValues, shardID, err := entityLocator.Locate(metadata.Name, 
tagFamilies, shardNum)
-       if err != nil {
-               return nil, common.ShardID(0), err
-       }
-       shardingKeyLocator, existed := ds.shardingKeyRepo.getLocator(id)
-       if !existed {
-               return entityValues, shardID, nil
+       var entityValues pbv1.EntityValues
+       var shardID common.ShardID
+       var err error
+       if specEntityLocator != nil {
+               entityValues, shardID, err = 
specEntityLocator.Locate(metadata.Name, tagFamilies, shardNum)
+               if err != nil {
+                       return nil, common.ShardID(0), err
+               }
+       } else {
+               entityLocator, existed := ds.entityRepo.getLocator(id)
+               if !existed {
+                       return nil, common.ShardID(0), 
errors.Wrapf(errNotExist, "finding the entity locator by: %v", metadata)
+               }
+               entityValues, shardID, err = 
entityLocator.Locate(metadata.Name, tagFamilies, shardNum)
+               if err != nil {
+                       return nil, common.ShardID(0), err
+               }
        }
-       _, shardID, err = shardingKeyLocator.Locate(metadata.Name, tagFamilies, 
shardNum)
-       if err != nil {
-               return nil, common.ShardID(0), err
+       if specShardingKeyLocator != nil {
+               _, shardID, err = specShardingKeyLocator.Locate(metadata.Name, 
tagFamilies, shardNum)
+               if err != nil {
+                       return nil, common.ShardID(0), err
+               }
+       } else {
+               shardingKeyLocator, existed := ds.shardingKeyRepo.getLocator(id)
+               if existed {
+                       _, shardID, err = 
shardingKeyLocator.Locate(metadata.Name, tagFamilies, shardNum)
+                       if err != nil {
+                               return nil, common.ShardID(0), err
+                       }
+               }
        }
        return entityValues, shardID, nil
 }
@@ -192,6 +211,7 @@ type entityRepo struct {
        log             *logger.Logger
        entitiesMap     map[identity]partition.Locator
        measureMap      map[identity]*databasev1.Measure
+       streamMap       map[identity]*databasev1.Stream
        traceMap        map[identity]*databasev1.Trace
        traceIDIndexMap map[identity]int // Cache trace ID tag index
        sync.RWMutex
@@ -255,11 +275,14 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
        e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
-       if schemaMetadata.Kind == schema.KindMeasure {
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
                measure := schemaMetadata.Spec.(*databasev1.Measure)
                e.measureMap[id] = measure
-       } else {
-               delete(e.measureMap, id) // Ensure measure is not stored for 
streams
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               e.streamMap[id] = stream
+       default:
        }
 }
 
@@ -301,6 +324,7 @@ func (e *entityRepo) OnDelete(schemaMetadata 
schema.Metadata) {
        defer e.RWMutex.Unlock()
        delete(e.entitiesMap, id)
        delete(e.measureMap, id)      // Clean up measure
+       delete(e.streamMap, id)       // Clean up stream
        delete(e.traceMap, id)        // Clean up trace
        delete(e.traceIDIndexMap, id) // Clean up trace ID index
 }
@@ -342,6 +366,13 @@ func (e *entityRepo) getMeasure(id identity) 
(*databasev1.Measure, bool) {
        return m, ok
 }
 
+func (e *entityRepo) getStream(id identity) (*databasev1.Stream, bool) {
+       e.RWMutex.RLock()
+       defer e.RWMutex.RUnlock()
+       s, ok := e.streamMap[id]
+       return s, ok
+}
+
 var _ schema.EventHandler = (*shardingKeyRepo)(nil)
 
 type shardingKeyRepo struct {
diff --git a/banyand/liaison/grpc/locator.go b/banyand/liaison/grpc/locator.go
new file mode 100644
index 00000000..7b2d986c
--- /dev/null
+++ b/banyand/liaison/grpc/locator.go
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/partition"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+type tagFamilySpec interface {
+       GetName() string
+       GetTagNames() []string
+}
+
+type specLocator struct {
+       tagLocators []partition.TagLocator
+}
+
+func newSpecLocator[T tagFamilySpec](schemaFamilies 
[]*databasev1.TagFamilySpec, tagNames []string, specFamilies []T) *specLocator {
+       specFamilyMap, specTagMaps := buildSpecMaps(specFamilies)
+       locator := make([]partition.TagLocator, 0, len(tagNames))
+       for _, tagName := range tagNames {
+               familyIdx, tagIdx := findTagInSpec(schemaFamilies, tagName, 
specFamilyMap, specTagMaps)
+               locator = append(locator, partition.TagLocator{FamilyOffset: 
familyIdx, TagOffset: tagIdx})
+       }
+       return &specLocator{tagLocators: locator}
+}
+
+func buildSpecMaps[T tagFamilySpec](specFamilies []T) (map[string]int, 
map[string]map[string]int) {
+       specFamilyMap := make(map[string]int, len(specFamilies))
+       specTagMaps := make(map[string]map[string]int, len(specFamilies))
+       for i, specFamily := range specFamilies {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int, len(specFamily.GetTagNames()))
+               for j, tagName := range specFamily.GetTagNames() {
+                       tagMap[tagName] = j
+               }
+               specTagMaps[specFamily.GetName()] = tagMap
+       }
+       return specFamilyMap, specTagMaps
+}
+
+func findTagInSpec(schemaFamilies []*databasev1.TagFamilySpec, tagName string,
+       specFamilyMap map[string]int, specTagMaps map[string]map[string]int,
+) (familyIdx, tagIdx int) {
+       for _, schemaFamily := range schemaFamilies {
+               for _, schemaTag := range schemaFamily.GetTags() {
+                       if schemaTag.GetName() != tagName {
+                               continue
+                       }
+                       fIdx, ok := specFamilyMap[schemaFamily.GetName()]
+                       if !ok {
+                               return -1, -1
+                       }
+                       tagMap := specTagMaps[schemaFamily.GetName()]
+                       if tagMap == nil {
+                               return -1, -1
+                       }
+                       tIdx, ok := tagMap[tagName]
+                       if !ok {
+                               return -1, -1
+                       }
+                       return fIdx, tIdx
+               }
+       }
+       return -1, -1
+}
+
+// Locate finds the entity values and shard ID using the spec locator.
+func (l *specLocator) Locate(subject string, tagFamilies 
[]*modelv1.TagFamilyForWrite, shardNum uint32) (pbv1.EntityValues, 
common.ShardID, error) {
+       entity, tagValues, err := l.Find(subject, tagFamilies)
+       if err != nil {
+               return nil, 0, err
+       }
+       id, err := partition.ShardID(entity.Marshal(), shardNum)
+       if err != nil {
+               return nil, 0, err
+       }
+       return tagValues, common.ShardID(id), nil
+}
+
+// Find finds the entity and entity values from tag families.
+func (l *specLocator) Find(subject string, tagFamilies 
[]*modelv1.TagFamilyForWrite) (pbv1.Entity, pbv1.EntityValues, error) {
+       entityValues := make(pbv1.EntityValues, len(l.tagLocators)+1)
+       entityValues[0] = pbv1.EntityStrValue(subject)
+       for i, index := range l.tagLocators {
+               if index.FamilyOffset < 0 || index.TagOffset < 0 {
+                       entityValues[i+1] = &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}}
+                       continue
+               }
+               tag, err := partition.GetTagByOffset(tagFamilies, 
index.FamilyOffset, index.TagOffset)
+               if err != nil {
+                       return nil, nil, err
+               }
+               entityValues[i+1] = tag
+       }
+       entity, err := entityValues.ToEntity()
+       if err != nil {
+               return nil, nil, err
+       }
+       return entity, entityValues, nil
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 001b6d02..125c8aea 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -29,7 +29,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -37,12 +36,23 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+type measureTagFamilySpec struct {
+       *measurev1.TagFamilySpec
+}
+
+func (m measureTagFamilySpec) GetName() string {
+       return m.TagFamilySpec.GetName()
+}
+
+func (m measureTagFamilySpec) GetTagNames() []string {
+       return m.TagFamilySpec.GetTagNames()
+}
+
 type measureService struct {
        measurev1.UnimplementedMeasureServiceServer
        ingestionAccessLog accesslog.Log
@@ -87,6 +97,8 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
 
        var metadata *commonv1.Metadata
        var spec *measurev1.DataPointSpec
+       var specEntityLocator *specLocator
+       var specShardingKeyLocator *specLocator
        isFirstRequest := true
        nodeMetadataSent := make(map[string]bool)
        nodeSpecSent := make(map[string]bool)
@@ -118,10 +130,10 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        return errors.New("metadata is required for the first 
request of gRPC stream")
                }
                isFirstRequest = false
-
                if writeRequest.GetDataPointSpec() != nil {
                        spec = writeRequest.GetDataPointSpec()
                        nodeSpecSent = make(map[string]bool)
+                       specEntityLocator, specShardingKeyLocator = 
ms.buildSpecLocators(metadata, spec)
                }
 
                ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"measure", "write")
@@ -130,12 +142,35 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        continue
                }
 
-               if err := ms.processAndPublishRequest(ctx, writeRequest, 
metadata, spec, publisher, &succeedSent, measure, nodeMetadataSent, 
nodeSpecSent); err != nil {
+               if err := ms.processAndPublishRequest(ctx, writeRequest, 
metadata, spec,
+                       specEntityLocator, specShardingKeyLocator, publisher, 
&succeedSent, measure, nodeMetadataSent, nodeSpecSent); err != nil {
                        continue
                }
        }
 }
 
+func (ms *measureService) buildSpecLocators(metadata *commonv1.Metadata, spec 
*measurev1.DataPointSpec) (*specLocator, *specLocator) {
+       if spec == nil {
+               return nil, nil
+       }
+       id := getID(metadata)
+       measure, ok := ms.entityRepo.getMeasure(id)
+       if !ok {
+               return nil, nil
+       }
+       specFamilies := make([]tagFamilySpec, len(spec.GetTagFamilySpec()))
+       for i, f := range spec.GetTagFamilySpec() {
+               specFamilies[i] = measureTagFamilySpec{f}
+       }
+       entityLocator := newSpecLocator(measure.GetTagFamilies(), 
measure.GetEntity().GetTagNames(), specFamilies)
+       var shardingKeyLocator *specLocator
+       shardingKey := measure.GetShardingKey()
+       if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
+               shardingKeyLocator = newSpecLocator(measure.GetTagFamilies(), 
shardingKey.GetTagNames(), specFamilies)
+       }
+       return entityLocator, shardingKeyLocator
+}
+
 func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequest,
        metadata *commonv1.Metadata, measure 
measurev1.MeasureService_WriteServer,
 ) modelv1.Status {
@@ -148,7 +183,7 @@ func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequ
        if metadata.ModRevision > 0 {
                measureCache, existed := 
ms.entityRepo.getLocator(getID(metadata))
                if !existed {
-                       ms.l.Error().Stringer("written", 
writeRequest).Msg("failed to measure schema not found")
+                       ms.l.Error().Stringer("written", 
writeRequest).Msg("measure schema not found")
                        ms.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeRequest.GetMessageId(), measure)
                        return modelv1.Status_STATUS_NOT_FOUND
                }
@@ -163,8 +198,9 @@ func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequ
 }
 
 func (ms *measureService) processAndPublishRequest(ctx context.Context, 
writeRequest *measurev1.WriteRequest,
-       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, publisher 
queue.BatchPublisher,
-       succeedSent *[]succeedSentMessage, measure 
measurev1.MeasureService_WriteServer,
+       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec,
+       specEntityLocator *specLocator, specShardingKeyLocator *specLocator,
+       publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage, 
measure measurev1.MeasureService_WriteServer,
        nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
 ) error {
        // Retry with backoff when encountering errNotExist
@@ -176,7 +212,7 @@ func (ms *measureService) processAndPublishRequest(ctx 
context.Context, writeReq
                retryInterval := 10 * time.Millisecond
                startTime := time.Now()
                for {
-                       tagValues, shardID, err = ms.navigate(metadata, 
writeRequest, spec)
+                       tagValues, shardID, err = ms.navigate(metadata, 
writeRequest, specEntityLocator, specShardingKeyLocator)
                        if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > ms.maxWaitDuration {
                                break
                        }
@@ -189,7 +225,7 @@ func (ms *measureService) processAndPublishRequest(ctx 
context.Context, writeReq
                        }
                }
        } else {
-               tagValues, shardID, err = ms.navigate(metadata, writeRequest, 
spec)
+               tagValues, shardID, err = ms.navigate(metadata, writeRequest, 
specEntityLocator, specShardingKeyLocator)
        }
 
        if err != nil {
@@ -266,134 +302,11 @@ func (ms *measureService) publishToNodes(ctx 
context.Context, writeRequest *meas
        return []string{nodeID}, nil
 }
 
-func (ms *measureService) navigate(metadata *commonv1.Metadata,
-       writeRequest *measurev1.WriteRequest, spec *measurev1.DataPointSpec,
+func (ms *measureService) navigate(metadata *commonv1.Metadata, writeRequest 
*measurev1.WriteRequest,
+       specEntityLocator *specLocator, specShardingKeyLocator *specLocator,
 ) (pbv1.EntityValues, common.ShardID, error) {
        tagFamilies := writeRequest.GetDataPoint().GetTagFamilies()
-       if spec == nil {
-               return ms.navigateByLocator(metadata, tagFamilies)
-       }
-       return ms.navigateByTagSpec(metadata, spec, tagFamilies)
-}
-
-func (ms *measureService) navigateByTagSpec(
-       metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, tagFamilies 
[]*modelv1.TagFamilyForWrite,
-) (pbv1.EntityValues, common.ShardID, error) {
-       shardNum, existed := ms.groupRepo.shardNum(metadata.Group)
-       if !existed {
-               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
-       }
-       id := getID(metadata)
-       measure, ok := ms.entityRepo.getMeasure(id)
-       if !ok {
-               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding measure schema by: %v", metadata)
-       }
-       specFamilyMap, specTagMaps := ms.buildSpecMaps(spec)
-
-       entityValues := ms.findTagValuesByNames(
-               metadata.Name,
-               measure.GetTagFamilies(),
-               tagFamilies,
-               measure.GetEntity().GetTagNames(),
-               specFamilyMap,
-               specTagMaps,
-       )
-       entity, err := entityValues.ToEntity()
-       if err != nil {
-               return nil, common.ShardID(0), err
-       }
-
-       shardingKey := measure.GetShardingKey()
-       if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
-               shardingKeyValues := ms.findTagValuesByNames(
-                       metadata.Name,
-                       measure.GetTagFamilies(),
-                       tagFamilies,
-                       shardingKey.GetTagNames(),
-                       specFamilyMap,
-                       specTagMaps,
-               )
-               shardingEntity, shardingErr := shardingKeyValues.ToEntity()
-               if shardingErr != nil {
-                       return nil, common.ShardID(0), shardingErr
-               }
-               shardID, shardingErr := 
partition.ShardID(shardingEntity.Marshal(), shardNum)
-               if shardingErr != nil {
-                       return nil, common.ShardID(0), shardingErr
-               }
-               return entityValues, common.ShardID(shardID), nil
-       }
-
-       shardID, err := partition.ShardID(entity.Marshal(), shardNum)
-       if err != nil {
-               return nil, common.ShardID(0), err
-       }
-       return entityValues, common.ShardID(shardID), nil
-}
-
-func (ms *measureService) buildSpecMaps(spec *measurev1.DataPointSpec) 
(map[string]int, map[string]map[string]int) {
-       specFamilyMap := make(map[string]int)
-       specTagMaps := make(map[string]map[string]int)
-       for i, specFamily := range spec.GetTagFamilySpec() {
-               specFamilyMap[specFamily.GetName()] = i
-               tagMap := make(map[string]int)
-               for j, tagName := range specFamily.GetTagNames() {
-                       tagMap[tagName] = j
-               }
-               specTagMaps[specFamily.GetName()] = tagMap
-       }
-       return specFamilyMap, specTagMaps
-}
-
-func (ms *measureService) findTagValuesByNames(
-       subject string,
-       schemaFamilies []*databasev1.TagFamilySpec,
-       srcTagFamilies []*modelv1.TagFamilyForWrite,
-       tagNames []string,
-       specFamilyMap map[string]int,
-       specTagMaps map[string]map[string]int,
-) pbv1.EntityValues {
-       entityValues := make(pbv1.EntityValues, len(tagNames)+1)
-       entityValues[0] = pbv1.EntityStrValue(subject)
-       for i, tagName := range tagNames {
-               tagValue := ms.findTagValueByName(schemaFamilies, 
srcTagFamilies, tagName, specFamilyMap, specTagMaps)
-               if tagValue == nil {
-                       entityValues[i+1] = &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}}
-               } else {
-                       entityValues[i+1] = tagValue
-               }
-       }
-       return entityValues
-}
-
-func (ms *measureService) findTagValueByName(
-       schemaFamilies []*databasev1.TagFamilySpec,
-       srcTagFamilies []*modelv1.TagFamilyForWrite,
-       tagName string,
-       specFamilyMap map[string]int,
-       specTagMaps map[string]map[string]int,
-) *modelv1.TagValue {
-       for _, schemaFamily := range schemaFamilies {
-               for _, schemaTag := range schemaFamily.GetTags() {
-                       if schemaTag.GetName() != tagName {
-                               continue
-                       }
-                       familyIdx, ok := specFamilyMap[schemaFamily.GetName()]
-                       if !ok || familyIdx >= len(srcTagFamilies) {
-                               return nil
-                       }
-                       tagMap := specTagMaps[schemaFamily.GetName()]
-                       if tagMap == nil {
-                               return nil
-                       }
-                       tagIdx, ok := tagMap[tagName]
-                       if !ok || tagIdx >= 
len(srcTagFamilies[familyIdx].GetTags()) {
-                               return nil
-                       }
-                       return srcTagFamilies[familyIdx].GetTags()[tagIdx]
-               }
-       }
-       return nil
+       return ms.navigateByLocator(metadata, tagFamilies, specEntityLocator, 
specShardingKeyLocator)
 }
 
 func (ms *measureService) sendReply(metadata *commonv1.Metadata, status 
modelv1.Status, messageID uint64, measure measurev1.MeasureService_WriteServer) 
{
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 81cd420e..af570cfa 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -41,6 +41,18 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+type streamTagFamilySpec struct {
+       *streamv1.TagFamilySpec
+}
+
+func (s streamTagFamilySpec) GetName() string {
+       return s.TagFamilySpec.GetName()
+}
+
+func (s streamTagFamilySpec) GetTagNames() []string {
+       return s.TagFamilySpec.GetTagNames()
+}
+
 type streamService struct {
        streamv1.UnimplementedStreamServiceServer
        ingestionAccessLog accesslog.Log
@@ -74,33 +86,63 @@ func (s *streamService) activeQueryAccessLog(root string, 
sampled bool) (err err
        return nil
 }
 
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) 
error {
-       if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != 
nil {
-               s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the 
element time is invalid")
-               return err
+func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
+       metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+       if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); 
errTime != nil {
+               s.l.Error().Err(errTime).Stringer("written", 
writeEntity).Msg("the element time is invalid")
+               s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream)
+               return modelv1.Status_STATUS_INVALID_TIMESTAMP
        }
-       return nil
-}
 
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) 
error {
-       if writeEntity.Metadata.ModRevision > 0 {
-               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+       if metadata.ModRevision > 0 {
+               streamCache, existed := s.entityRepo.getLocator(getID(metadata))
                if !existed {
-                       return errors.New("stream schema not found")
+                       s.l.Error().Stringer("written", 
writeEntity).Msg("stream schema not found")
+                       s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_NOT_FOUND
                }
-               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
-                       return errors.New("expired stream schema")
+               if metadata.ModRevision != streamCache.ModRevision {
+                       s.l.Error().Stringer("written", writeEntity).Msg("the 
stream schema is expired")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_EXPIRED_SCHEMA
                }
        }
-       return nil
+
+       return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata, writeRequest 
*streamv1.WriteRequest,
+       specLocator *specLocator,
+) (pbv1.EntityValues, common.ShardID, error) {
+       tagFamilies := writeRequest.GetElement().GetTagFamilies()
+       return s.navigateByLocator(metadata, tagFamilies, specLocator, nil)
 }
 
-func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest) 
(tagValues pbv1.EntityValues, shardID common.ShardID, err error) {
+func (s *streamService) buildSpecLocator(metadata *commonv1.Metadata, spec 
[]*streamv1.TagFamilySpec) *specLocator {
+       if spec == nil {
+               return nil
+       }
+       id := getID(metadata)
+       stream, ok := s.entityRepo.getStream(id)
+       if !ok {
+               return nil
+       }
+       specFamilies := make([]tagFamilySpec, len(spec))
+       for i, f := range spec {
+               specFamilies[i] = streamTagFamilySpec{f}
+       }
+       return newSpecLocator(stream.GetTagFamilies(), 
stream.GetEntity().GetTagNames(), specFamilies)
+}
+
+func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest, 
metadata *commonv1.Metadata,
+       specLocator *specLocator,
+) (tagValues pbv1.EntityValues, shardID common.ShardID, err error) {
        if s.maxWaitDuration > 0 {
                retryInterval := 10 * time.Millisecond
                startTime := time.Now()
                for {
-                       tagValues, shardID, err = 
s.navigateByLocator(writeEntity.GetMetadata(), 
writeEntity.GetElement().GetTagFamilies())
+                       tagValues, shardID, err = s.navigate(metadata, 
writeEntity, specLocator)
                        if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > s.maxWaitDuration {
                                return
                        }
@@ -111,26 +153,39 @@ func (s *streamService) navigateWithRetry(writeEntity 
*streamv1.WriteRequest) (t
                        }
                }
        }
-       return s.navigateByLocator(writeEntity.GetMetadata(), 
writeEntity.GetElement().GetTagFamilies())
+       return s.navigate(metadata, writeEntity, specLocator)
 }
 
 func (s *streamService) publishMessages(
        ctx context.Context,
        publisher queue.BatchPublisher,
        writeEntity *streamv1.WriteRequest,
+       metadata *commonv1.Metadata,
+       spec []*streamv1.TagFamilySpec,
        shardID common.ShardID,
        tagValues pbv1.EntityValues,
+       nodeMetadataSent map[string]bool,
+       nodeSpecSent map[string]bool,
 ) ([]string, error) {
        iwr := &streamv1.InternalWriteRequest{
                Request:      writeEntity,
                ShardId:      uint32(shardID),
                EntityValues: tagValues[1:].Encode(),
        }
-       nodeID, err := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID), 0)
+       nodeID, err := s.nodeRegistry.Locate(metadata.GetGroup(), 
metadata.GetName(), uint32(shardID), 0)
        if err != nil {
                return nil, err
        }
 
+       if !nodeMetadataSent[nodeID] {
+               iwr.Request.Metadata = metadata
+               nodeMetadataSent[nodeID] = true
+       }
+       if spec != nil && !nodeSpecSent[nodeID] {
+               iwr.Request.TagFamilySpec = spec
+               nodeSpecSent[nodeID] = true
+       }
+
        message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
        if _, err := publisher.Publish(ctx, data.TopicStreamWrite, message); 
err != nil {
                return nil, err
@@ -138,20 +193,24 @@ func (s *streamService) publishMessages(
        return []string{nodeID}, nil
 }
 
-func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
-       reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
messageId uint64, stream streamv1.StreamService_WriteServer, logger 
*logger.Logger) {
-               if status != modelv1.Status_STATUS_SUCCEED {
-                       s.metrics.totalStreamMsgReceivedErr.Inc(1, 
metadata.Group, "stream", "write")
-               }
-               s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "stream", 
"write")
-               if errResp := stream.Send(&streamv1.WriteResponse{Metadata: 
metadata, Status: status.String(), MessageId: messageId}); errResp != nil {
-                       if dl := logger.Debug(); dl.Enabled() {
-                               dl.Err(errResp).Msg("failed to send stream 
write response")
-                       }
-                       s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"stream", "write")
+func (s *streamService) sendReply(metadata *commonv1.Metadata, status 
modelv1.Status, messageID uint64, stream streamv1.StreamService_WriteServer) {
+       if metadata == nil {
+               s.l.Error().Stringer("status", status).Msg("metadata is nil, 
cannot send reply")
+               return
+       }
+       if status != modelv1.Status_STATUS_SUCCEED {
+               s.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, 
"stream", "write")
+       }
+       s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "stream", "write")
+       if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata, 
Status: status.String(), MessageId: messageID}); errResp != nil {
+               if dl := s.l.Debug(); dl.Enabled() {
+                       dl.Err(errResp).Msg("failed to send stream write 
response")
                }
+               s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"stream", "write")
        }
+}
 
+func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
        s.metrics.totalStreamStarted.Inc(1, "stream", "write")
        publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
        start := time.Now()
@@ -169,7 +228,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                                        }
                                }
                        }
-                       reply(ssm.metadata, code, ssm.messageID, stream, s.l)
+                       s.sendReply(ssm.metadata, code, ssm.messageID, stream)
                }
                if err != nil {
                        s.l.Error().Err(err).Msg("failed to close the 
publisher")
@@ -182,6 +241,14 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
        }()
 
        ctx := stream.Context()
+
+       var metadata *commonv1.Metadata
+       var spec []*streamv1.TagFamilySpec
+       var specLocator *specLocator
+       isFirstRequest := true
+       nodeMetadataSent := make(map[string]bool)
+       nodeSpecSent := make(map[string]bool)
+
        for {
                select {
                case <-ctx.Done():
@@ -200,30 +267,32 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        return err
                }
 
-               requestCount++
-               s.metrics.totalStreamMsgReceived.Inc(1, 
writeEntity.Metadata.Group, "stream", "write")
-
-               if err = s.validateTimestamp(writeEntity); err != nil {
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, 
s.l)
-                       continue
+               if writeEntity.GetMetadata() != nil {
+                       metadata = writeEntity.GetMetadata()
+                       nodeMetadataSent = make(map[string]bool)
+               } else if isFirstRequest {
+                       s.l.Error().Msg("metadata is required for the first 
request of gRPC stream")
+                       s.sendReply(nil, 
modelv1.Status_STATUS_METADATA_REQUIRED, writeEntity.GetMessageId(), stream)
+                       return errors.New("metadata is required for the first 
request of gRPC stream")
+               }
+               isFirstRequest = false
+               if writeEntity.GetTagFamilySpec() != nil {
+                       spec = writeEntity.GetTagFamilySpec()
+                       nodeSpecSent = make(map[string]bool)
+                       specLocator = s.buildSpecLocator(metadata, spec)
                }
 
-               if err = s.validateMetadata(writeEntity); err != nil {
-                       status := modelv1.Status_STATUS_INTERNAL_ERROR
-                       if errors.Is(err, errors.New("stream schema not 
found")) {
-                               status = modelv1.Status_STATUS_NOT_FOUND
-                       } else if errors.Is(err, errors.New("expired stream 
schema")) {
-                               status = modelv1.Status_STATUS_EXPIRED_SCHEMA
-                       }
-                       s.l.Error().Err(err).Stringer("written", 
writeEntity).Msg("metadata validation failed")
-                       reply(writeEntity.GetMetadata(), status, 
writeEntity.GetMessageId(), stream, s.l)
+               requestCount++
+               s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"stream", "write")
+
+               if s.validateWriteRequest(writeEntity, metadata, stream) != 
modelv1.Status_STATUS_SUCCEED {
                        continue
                }
 
-               tagValues, shardID, err := s.navigateWithRetry(writeEntity)
+               tagValues, shardID, err := s.navigateWithRetry(writeEntity, 
metadata, specLocator)
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("navigation failed")
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
                        continue
                }
 
@@ -233,15 +302,15 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        }
                }
 
-               nodes, err := s.publishMessages(ctx, publisher, writeEntity, 
shardID, tagValues)
+               nodes, err := s.publishMessages(ctx, publisher, writeEntity, 
metadata, spec, shardID, tagValues, nodeMetadataSent, nodeSpecSent)
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("publishing failed")
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
                        continue
                }
 
                succeedSent = append(succeedSent, succeedSentMessage{
-                       metadata:  writeEntity.GetMetadata(),
+                       metadata:  metadata,
                        messageID: writeEntity.GetMessageId(),
                        nodes:     nodes,
                })
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 89125a21..083927c8 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -19,7 +19,6 @@ package grpc
 
 import (
        "context"
-       "hash/fnv"
        "io"
        "time"
 
@@ -41,6 +40,30 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+type traceSpecLocator struct {
+       traceIDIndex   int
+       timestampIndex int
+}
+
+func newTraceSpecLocator(spec *tracev1.TagSpec, traceIDTagName, 
timestampTagName string) *traceSpecLocator {
+       locator := &traceSpecLocator{
+               traceIDIndex:   -1,
+               timestampIndex: -1,
+       }
+       if spec == nil {
+               return locator
+       }
+       for i, tagName := range spec.GetTagNames() {
+               if tagName == traceIDTagName {
+                       locator.traceIDIndex = i
+               }
+               if tagName == timestampTagName {
+                       locator.timestampIndex = i
+               }
+       }
+       return locator
+}
+
 type traceService struct {
        tracev1.UnimplementedTraceServiceServer
        ingestionAccessLog accesslog.Log
@@ -74,51 +97,115 @@ func (s *traceService) activeQueryAccessLog(root string, 
sampled bool) (err erro
        return nil
 }
 
-func (s *traceService) validateTimestamp(writeEntity *tracev1.WriteRequest) 
error {
-       // Get trace schema from entityRepo
-       id := getID(writeEntity.GetMetadata())
+func (s *traceService) validateWriteRequest(writeEntity *tracev1.WriteRequest,
+       metadata *commonv1.Metadata, specLocator *traceSpecLocator, stream 
tracev1.TraceService_WriteServer,
+) modelv1.Status {
+       id := getID(metadata)
        traceEntity, existed := s.entityRepo.getTrace(id)
        if !existed {
-               return errors.New("trace schema not found")
+               s.l.Error().Stringer("written", writeEntity).Msg("trace schema 
not found")
+               s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeEntity.GetVersion(), stream)
+               return modelv1.Status_STATUS_NOT_FOUND
        }
 
-       timestampTagName := traceEntity.GetTimestampTagName()
-       for _, tag := range writeEntity.GetTags() {
-               if tag.GetTimestamp() != nil {
-                       if err := timestamp.CheckPb(tag.GetTimestamp()); err != 
nil {
-                               s.l.Error().Stringer("written", 
writeEntity).Err(err).Msg("the timestamp is invalid")
-                               return err
+       var foundTimestamp bool
+       if specLocator != nil && specLocator.timestampIndex >= 0 {
+               tags := writeEntity.GetTags()
+               if specLocator.timestampIndex < len(tags) {
+                       tagValue := tags[specLocator.timestampIndex]
+                       if tagValue != nil && tagValue.GetTimestamp() != nil {
+                               if errTime := 
timestamp.CheckPb(tagValue.GetTimestamp()); errTime != nil {
+                                       
s.l.Error().Err(errTime).Stringer("written", writeEntity).Msg("the timestamp is 
invalid")
+                                       s.sendReply(metadata, 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream)
+                                       return 
modelv1.Status_STATUS_INVALID_TIMESTAMP
+                               }
+                               foundTimestamp = true
                        }
-                       return nil
                }
        }
+       if !foundTimestamp {
+               for _, tag := range writeEntity.GetTags() {
+                       if tag.GetTimestamp() != nil {
+                               if errTime := 
timestamp.CheckPb(tag.GetTimestamp()); errTime != nil {
+                                       
s.l.Error().Err(errTime).Stringer("written", writeEntity).Msg("the timestamp is 
invalid")
+                                       s.sendReply(metadata, 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream)
+                                       return 
modelv1.Status_STATUS_INVALID_TIMESTAMP
+                               }
+                               foundTimestamp = true
+                               break
+                       }
+               }
+       }
+       if !foundTimestamp {
+               timestampTagName := traceEntity.GetTimestampTagName()
+               s.l.Error().Stringer("written", writeEntity).Msg("timestamp tag 
not found: " + timestampTagName)
+               s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetVersion(), stream)
+               return modelv1.Status_STATUS_INVALID_TIMESTAMP
+       }
 
-       return errors.New("timestamp tag not found: " + timestampTagName)
+       if metadata.ModRevision > 0 {
+               if metadata.ModRevision != 
traceEntity.GetMetadata().GetModRevision() {
+                       s.l.Error().Stringer("written", writeEntity).Msg("the 
trace schema is expired")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetVersion(), stream)
+                       return modelv1.Status_STATUS_EXPIRED_SCHEMA
+               }
+       }
+
+       return modelv1.Status_STATUS_SUCCEED
 }
 
-func (s *traceService) validateMetadata(writeEntity *tracev1.WriteRequest) 
error {
-       if writeEntity.Metadata.ModRevision > 0 {
-               traceCache, existed := 
s.entityRepo.getTrace(getID(writeEntity.GetMetadata()))
-               if !existed {
-                       return errors.New("trace schema not found")
-               }
-               if writeEntity.Metadata.ModRevision != 
traceCache.GetMetadata().GetModRevision() {
-                       return errors.New("expired trace schema")
+func (s *traceService) navigate(metadata *commonv1.Metadata,
+       writeRequest *tracev1.WriteRequest, specLocator *traceSpecLocator,
+) (common.ShardID, error) {
+       shardCount, existed := s.groupRepo.shardNum(metadata.GetGroup())
+       if !existed {
+               return 0, errors.Wrapf(errNotExist, "finding the shard num by: 
%v", metadata)
+       }
+       id := getID(metadata)
+       traceEntity, existed := s.entityRepo.getTrace(id)
+       if !existed {
+               return 0, errors.Wrapf(errNotExist, "finding trace schema by: 
%v", metadata)
+       }
+
+       var traceID string
+       var err error
+       if specLocator != nil && specLocator.traceIDIndex >= 0 {
+               tags := writeRequest.GetTags()
+               if specLocator.traceIDIndex < len(tags) {
+                       traceID, err = 
extractTraceIDFromTagValue(tags[specLocator.traceIDIndex])
+                       if err == nil {
+                               return s.shardID(traceID, shardCount), nil
+                       }
                }
        }
-       return nil
+
+       traceIDIndex, existed := s.entityRepo.getTraceIDIndex(id)
+       if !existed || traceIDIndex == -1 {
+               return 0, errors.New("trace ID tag not found in schema: " + 
traceEntity.GetTraceIdTagName())
+       }
+       traceID, err = s.extractTraceID(writeRequest.GetTags(), traceIDIndex)
+       if err != nil {
+               return 0, err
+       }
+       return s.shardID(traceID, shardCount), nil
+}
+
+func (s *traceService) shardID(traceID string, shardCount uint32) 
common.ShardID {
+       hash := convert.Hash([]byte(traceID))
+       return common.ShardID(hash % uint64(shardCount))
 }
 
 func (s *traceService) extractTraceID(tags []*modelv1.TagValue, traceIDIndex 
int) (string, error) {
        if len(tags) == 0 {
                return "", errors.New("no tags found")
        }
-
        if traceIDIndex < 0 || traceIDIndex >= len(tags) {
                return "", errors.New("trace ID tag index out of range")
        }
+       return extractTraceIDFromTagValue(tags[traceIDIndex])
+}
 
-       tag := tags[traceIDIndex]
+func extractTraceIDFromTagValue(tag *modelv1.TagValue) (string, error) {
        switch v := tag.GetValue().(type) {
        case *modelv1.TagValue_Str:
                return v.Str.GetValue(), nil
@@ -129,45 +216,16 @@ func (s *traceService) extractTraceID(tags 
[]*modelv1.TagValue, traceIDIndex int
        }
 }
 
-func (s *traceService) getTraceShardID(writeEntity *tracev1.WriteRequest) 
(common.ShardID, error) {
-       // Get shard count from group configuration
-       shardCount, existed := 
s.groupRepo.shardNum(writeEntity.GetMetadata().GetGroup())
-       if !existed {
-               return 0, errors.New("group not found or no shard 
configuration")
-       }
-
-       // Get cached trace ID index from entityRepo
-       id := getID(writeEntity.GetMetadata())
-       traceIDIndex, existed := s.entityRepo.getTraceIDIndex(id)
-       if !existed {
-               return 0, errors.New("trace schema not found")
-       }
-
-       if traceIDIndex == -1 {
-               return 0, errors.New("trace ID tag not found in schema")
-       }
-
-       traceID, err := s.extractTraceID(writeEntity.GetTags(), traceIDIndex)
-       if err != nil {
-               return 0, err
-       }
-
-       // Calculate shard ID using hash of trace ID
-       hasher := fnv.New32a()
-       hasher.Write([]byte(traceID))
-       hash := hasher.Sum32()
-
-       return common.ShardID(hash % shardCount), nil
-}
-
-func (s *traceService) getTraceShardIDWithRetry(writeEntity 
*tracev1.WriteRequest) (common.ShardID, error) {
+func (s *traceService) navigateWithRetry(writeEntity *tracev1.WriteRequest, 
metadata *commonv1.Metadata,
+       specLocator *traceSpecLocator,
+) (shardID common.ShardID, err error) {
        if s.maxWaitDuration > 0 {
                retryInterval := 10 * time.Millisecond
                startTime := time.Now()
                for {
-                       shardID, err := s.getTraceShardID(writeEntity)
+                       shardID, err = s.navigate(metadata, writeEntity, 
specLocator)
                        if err == nil || !errors.Is(err, errNotExist) || 
time.Since(startTime) > s.maxWaitDuration {
-                               return shardID, err
+                               return
                        }
                        time.Sleep(retryInterval)
                        retryInterval = time.Duration(float64(retryInterval) * 
1.5)
@@ -176,24 +234,42 @@ func (s *traceService) 
getTraceShardIDWithRetry(writeEntity *tracev1.WriteReques
                        }
                }
        }
-       return s.getTraceShardID(writeEntity)
+       return s.navigate(metadata, writeEntity, specLocator)
 }
 
 func (s *traceService) publishMessages(
        ctx context.Context,
        publisher queue.BatchPublisher,
        writeEntity *tracev1.WriteRequest,
+       metadata *commonv1.Metadata,
+       spec *tracev1.TagSpec,
        shardID common.ShardID,
+       nodeMetadataSent map[string]bool,
+       nodeSpecSent map[string]bool,
 ) ([]string, error) {
-       iwr := &tracev1.InternalWriteRequest{
-               ShardId: uint32(shardID),
-               Request: writeEntity,
-       }
-       nodeID, err := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID), 0)
+       nodeID, err := s.nodeRegistry.Locate(metadata.GetGroup(), 
metadata.GetName(), uint32(shardID), 0)
        if err != nil {
                return nil, err
        }
 
+       requestToSend := &tracev1.WriteRequest{
+               Version: writeEntity.GetVersion(),
+               Tags:    writeEntity.GetTags(),
+               Span:    writeEntity.GetSpan(),
+       }
+       if !nodeMetadataSent[nodeID] {
+               requestToSend.Metadata = metadata
+               nodeMetadataSent[nodeID] = true
+       }
+       if spec != nil && !nodeSpecSent[nodeID] {
+               requestToSend.TagSpec = spec
+               nodeSpecSent[nodeID] = true
+       }
+       iwr := &tracev1.InternalWriteRequest{
+               ShardId: uint32(shardID),
+               Request: requestToSend,
+       }
+
        message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
        if _, err := publisher.Publish(ctx, data.TopicTraceWrite, message); err 
!= nil {
                return nil, err
@@ -201,20 +277,24 @@ func (s *traceService) publishMessages(
        return []string{nodeID}, nil
 }
 
-func (s *traceService) Write(stream tracev1.TraceService_WriteServer) error {
-       reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
version uint64, stream tracev1.TraceService_WriteServer, logger *logger.Logger) 
{
-               if status != modelv1.Status_STATUS_SUCCEED {
-                       s.metrics.totalStreamMsgReceivedErr.Inc(1, 
metadata.Group, "trace", "write")
-               }
-               s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "trace", 
"write")
-               if errResp := stream.Send(&tracev1.WriteResponse{Metadata: 
metadata, Status: status.String(), Version: version}); errResp != nil {
-                       if dl := logger.Debug(); dl.Enabled() {
-                               dl.Err(errResp).Msg("failed to send trace write 
response")
-                       }
-                       s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"trace", "write")
+func (s *traceService) sendReply(metadata *commonv1.Metadata, status 
modelv1.Status, version uint64, stream tracev1.TraceService_WriteServer) {
+       if metadata == nil {
+               s.l.Error().Stringer("status", status).Msg("metadata is nil, 
cannot send reply")
+               return
+       }
+       if status != modelv1.Status_STATUS_SUCCEED {
+               s.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, 
"trace", "write")
+       }
+       s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "trace", "write")
+       if errResp := stream.Send(&tracev1.WriteResponse{Metadata: metadata, 
Status: status.String(), Version: version}); errResp != nil {
+               if dl := s.l.Debug(); dl.Enabled() {
+                       dl.Err(errResp).Msg("failed to send trace write 
response")
                }
+               s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "trace", 
"write")
        }
+}
 
+func (s *traceService) Write(stream tracev1.TraceService_WriteServer) error {
        s.metrics.totalStreamStarted.Inc(1, "trace", "write")
        publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
        start := time.Now()
@@ -232,7 +312,7 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
                                        }
                                }
                        }
-                       reply(ssm.metadata, code, ssm.messageID, stream, s.l)
+                       s.sendReply(ssm.metadata, code, ssm.messageID, stream)
                }
                if err != nil {
                        s.l.Error().Err(err).Msg("failed to close the 
publisher")
@@ -245,6 +325,14 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
        }()
 
        ctx := stream.Context()
+
+       var metadata *commonv1.Metadata
+       var spec *tracev1.TagSpec
+       var specLocator *traceSpecLocator
+       isFirstRequest := true
+       nodeMetadataSent := make(map[string]bool)
+       nodeSpecSent := make(map[string]bool)
+
        for {
                select {
                case <-ctx.Done():
@@ -263,30 +351,37 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
                        return err
                }
 
-               requestCount++
-               s.metrics.totalStreamMsgReceived.Inc(1, 
writeEntity.Metadata.Group, "trace", "write")
-
-               if err = s.validateTimestamp(writeEntity); err != nil {
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream, s.l)
-                       continue
+               if writeEntity.GetMetadata() != nil {
+                       metadata = writeEntity.GetMetadata()
+                       nodeMetadataSent = make(map[string]bool)
+                       specLocator = nil
+               } else if isFirstRequest {
+                       s.l.Error().Msg("metadata is required for the first 
request of gRPC stream")
+                       s.sendReply(nil, 
modelv1.Status_STATUS_METADATA_REQUIRED, writeEntity.GetVersion(), stream)
+                       return errors.New("metadata is required for the first 
request of gRPC stream")
                }
-
-               if err = s.validateMetadata(writeEntity); err != nil {
-                       status := modelv1.Status_STATUS_INTERNAL_ERROR
-                       if errors.Is(err, errors.New("trace schema not found")) 
{
-                               status = modelv1.Status_STATUS_NOT_FOUND
-                       } else if errors.Is(err, errors.New("expired trace 
schema")) {
-                               status = modelv1.Status_STATUS_EXPIRED_SCHEMA
+               isFirstRequest = false
+               if writeEntity.GetTagSpec() != nil {
+                       spec = writeEntity.GetTagSpec()
+                       nodeSpecSent = make(map[string]bool)
+                       id := getID(metadata)
+                       traceEntity, existed := s.entityRepo.getTrace(id)
+                       if existed {
+                               specLocator = newTraceSpecLocator(spec, 
traceEntity.GetTraceIdTagName(), traceEntity.GetTimestampTagName())
                        }
-                       s.l.Error().Err(err).Stringer("written", 
writeEntity).Msg("metadata validation failed")
-                       reply(writeEntity.GetMetadata(), status, 
writeEntity.GetVersion(), stream, s.l)
+               }
+
+               requestCount++
+               s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"trace", "write")
+
+               if s.validateWriteRequest(writeEntity, metadata, specLocator, 
stream) != modelv1.Status_STATUS_SUCCEED {
                        continue
                }
 
-               shardID, err := s.getTraceShardIDWithRetry(writeEntity)
+               shardID, err := s.navigateWithRetry(writeEntity, metadata, 
specLocator)
                if err != nil {
-                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("trace sharding failed")
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l)
+                       s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("navigation failed")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
                        continue
                }
 
@@ -296,15 +391,15 @@ func (s *traceService) Write(stream 
tracev1.TraceService_WriteServer) error {
                        }
                }
 
-               nodes, err := s.publishMessages(ctx, publisher, writeEntity, 
shardID)
+               nodes, err := s.publishMessages(ctx, publisher, writeEntity, 
metadata, spec, shardID, nodeMetadataSent, nodeSpecSent)
                if err != nil {
                        s.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("publishing failed")
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l)
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
                        continue
                }
 
                succeedSent = append(succeedSent, succeedSentMessage{
-                       metadata:  writeEntity.GetMetadata(),
+                       metadata:  metadata,
                        messageID: writeEntity.GetVersion(),
                        nodes:     nodes,
                })
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index a9247c6a..d5283e53 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -24,6 +24,7 @@ import (
        "fmt"
        "math/big"
        "path"
+       "strings"
        "sync"
        "time"
 
@@ -270,7 +271,12 @@ func (e *etcdSchemaRegistry) prependNamespace(key string) 
string {
        if e.namespace == "" {
                return key
        }
-       return path.Join("/", e.namespace, key)
+       hasTrailingSlash := strings.HasSuffix(key, "/")
+       result := path.Join("/", e.namespace, key)
+       if hasTrailingSlash && !strings.HasSuffix(result, "/") {
+               result += "/"
+       }
+       return result
 }
 
 func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message 
proto.Message) error {
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index be566f09..b86f48d1 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -69,13 +70,15 @@ func (w *writeQueueCallback) CheckHealth() *common.Error {
        return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk 
usage is too high, stop writing")
 }
 
-func (w *writeQueueCallback) handle(dst map[string]*elementsInQueue, 
writeEvent *streamv1.InternalWriteRequest) (map[string]*elementsInQueue, error) 
{
+func (w *writeQueueCallback) handle(dst map[string]*elementsInQueue, 
writeEvent *streamv1.InternalWriteRequest,
+       metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
+) (map[string]*elementsInQueue, error) {
        t := writeEvent.Request.Element.Timestamp.AsTime().Local()
        if err := timestamp.Check(t); err != nil {
                return nil, fmt.Errorf("invalid timestamp: %w", err)
        }
        ts := t.UnixNano()
-       eq, err := w.prepareElementsInQueue(dst, writeEvent)
+       eq, err := w.prepareElementsInQueue(dst, metadata)
        if err != nil {
                return nil, err
        }
@@ -83,15 +86,15 @@ func (w *writeQueueCallback) handle(dst 
map[string]*elementsInQueue, writeEvent
        if err != nil {
                return nil, err
        }
-       err = processElements(w.schemaRepo, et.elements, writeEvent, ts, 
&et.docs, &et.seriesDocs)
+       err = processElements(w.schemaRepo, et.elements, writeEvent, ts, 
&et.docs, &et.seriesDocs, metadata, spec)
        if err != nil {
                return nil, err
        }
        return dst, nil
 }
 
-func (w *writeQueueCallback) prepareElementsInQueue(dst 
map[string]*elementsInQueue, writeEvent *streamv1.InternalWriteRequest) 
(*elementsInQueue, error) {
-       gn := writeEvent.Request.Metadata.Group
+func (w *writeQueueCallback) prepareElementsInQueue(dst 
map[string]*elementsInQueue, metadata *commonv1.Metadata) (*elementsInQueue, 
error) {
+       gn := metadata.Group
        queue, err := w.schemaRepo.loadQueue(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load queue for group %s: %w", 
gn, err)
@@ -155,6 +158,8 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
        groups := make(map[string]*elementsInQueue)
+       var metadata *commonv1.Metadata
+       var spec []*streamv1.TagFamilySpec
        for i := range events {
                var writeEvent *streamv1.InternalWriteRequest
                switch e := events[i].(type) {
@@ -170,8 +175,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                        w.l.Warn().Msg("invalid event data type")
                        continue
                }
+               req := writeEvent.Request
+               if req != nil && req.GetMetadata() != nil {
+                       metadata = req.GetMetadata()
+               }
+               if req != nil && req.GetTagFamilySpec() != nil {
+                       spec = req.GetTagFamilySpec()
+               }
                var err error
-               if groups, err = w.handle(groups, writeEvent); err != nil {
+               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
                        w.l.Error().Err(err).Msg("cannot handle write event")
                        groups = make(map[string]*elementsInQueue)
                        continue
diff --git a/banyand/stream/write_standalone.go 
b/banyand/stream/write_standalone.go
index b2b69771..c266cffa 100644
--- a/banyand/stream/write_standalone.go
+++ b/banyand/stream/write_standalone.go
@@ -26,6 +26,7 @@ import (
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -69,13 +70,14 @@ func (w *writeCallback) CheckHealth() *common.Error {
 }
 
 func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent 
*streamv1.InternalWriteRequest,
+       metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
 ) (map[string]*elementsInGroup, error) {
        t := writeEvent.Request.Element.Timestamp.AsTime().Local()
        if err := timestamp.Check(t); err != nil {
                return nil, fmt.Errorf("invalid timestamp: %w", err)
        }
        ts := t.UnixNano()
-       eg, err := w.prepareElementsInGroup(dst, writeEvent, ts)
+       eg, err := w.prepareElementsInGroup(dst, metadata, ts)
        if err != nil {
                return nil, err
        }
@@ -83,15 +85,15 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        if err != nil {
                return nil, err
        }
-       err = processElements(w.schemaRepo, et.elements, writeEvent, ts, 
&et.docs, &et.seriesDocs)
+       err = processElements(w.schemaRepo, et.elements, writeEvent, ts, 
&et.docs, &et.seriesDocs, metadata, spec)
        if err != nil {
                return nil, err
        }
        return dst, nil
 }
 
-func (w *writeCallback) prepareElementsInGroup(dst 
map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest, ts 
int64) (*elementsInGroup, error) {
-       gn := writeEvent.Request.Metadata.Group
+func (w *writeCallback) prepareElementsInGroup(dst 
map[string]*elementsInGroup, metadata *commonv1.Metadata, ts int64) 
(*elementsInGroup, error) {
+       gn := metadata.Group
        tsdb, err := w.schemaRepo.loadTSDB(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, 
err)
@@ -161,17 +163,17 @@ func (w *writeCallback) prepareElementsInTable(eg 
*elementsInGroup, writeEvent *
 }
 
 func processElements(schemaRepo *schemaRepo, elements *elements, writeEvent 
*streamv1.InternalWriteRequest,
-       ts int64, tableDocs *index.Documents, seriesDocs *seriesDoc,
+       ts int64, tableDocs *index.Documents, seriesDocs *seriesDoc, metadata 
*commonv1.Metadata, spec []*streamv1.TagFamilySpec,
 ) error {
        req := writeEvent.Request
 
        elements.timestamps = append(elements.timestamps, ts)
-       eID := convert.HashStr(req.Metadata.Group + "|" + req.Metadata.Name + 
"|" + req.Element.ElementId)
+       eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" + 
req.Element.ElementId)
        elements.elementIDs = append(elements.elementIDs, eID)
 
-       stm, ok := schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
+       stm, ok := schemaRepo.loadStream(metadata)
        if !ok {
-               return fmt.Errorf("cannot find stream definition: %s", 
writeEvent.GetRequest().GetMetadata())
+               return fmt.Errorf("cannot find stream definition: %s", metadata)
        }
 
        fLen := len(req.Element.GetTagFamilies())
@@ -179,11 +181,11 @@ func processElements(schemaRepo *schemaRepo, elements 
*elements, writeEvent *str
                return fmt.Errorf("%s has no tag family", req)
        }
        if fLen > len(stm.schema.GetTagFamilies()) {
-               return fmt.Errorf("%s has more tag families than %s", 
req.Metadata, stm.schema)
+               return fmt.Errorf("%s has more tag families than %s", metadata, 
stm.schema)
        }
 
        series := &pbv1.Series{
-               Subject:      req.Metadata.Name,
+               Subject:      metadata.Name,
                EntityValues: writeEvent.EntityValues,
        }
        if err := series.Marshal(); err != nil {
@@ -200,28 +202,37 @@ func processElements(schemaRepo *schemaRepo, elements 
*elements, writeEvent *str
                        len(is.indexRuleLocators.TagFamilyTRule), 
len(stm.GetSchema().GetTagFamilies()))
        }
 
+       specFamilyMap, specTagMaps := buildSpecMaps(spec)
+
        for i := range stm.GetSchema().GetTagFamilies() {
-               var tagFamily *modelv1.TagFamilyForWrite
-               if len(req.Element.TagFamilies) <= i {
-                       tagFamily = pbv1.NullTagFamily
-               } else {
-                       tagFamily = req.Element.TagFamilies[i]
-               }
-               tfr := is.indexRuleLocators.TagFamilyTRule[i]
                tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
+               srcFamily, specTagMap := getSrcFamilyAndTagMap(spec, 
tagFamilySpec, req, i, specFamilyMap, specTagMaps)
+               tfr := is.indexRuleLocators.TagFamilyTRule[i]
                tf := tagValues{
                        tag: tagFamilySpec.Name,
                }
 
                for j := range tagFamilySpec.Tags {
+                       t := tagFamilySpec.Tags[j]
+
                        var tagValue *modelv1.TagValue
-                       if tagFamily == pbv1.NullTagFamily || 
len(tagFamily.Tags) <= j {
-                               tagValue = pbv1.NullTagValue
+                       if spec != nil {
+                               if srcFamily != nil && specTagMap != nil {
+                                       if srcTagIdx, ok := specTagMap[t.Name]; 
ok && srcTagIdx < len(srcFamily.Tags) {
+                                               tagValue = 
srcFamily.Tags[srcTagIdx]
+                                       }
+                               }
+                               if tagValue == nil {
+                                       tagValue = pbv1.NullTagValue
+                               }
                        } else {
-                               tagValue = tagFamily.Tags[j]
+                               if srcFamily == nil || len(srcFamily.Tags) <= j 
{
+                                       tagValue = pbv1.NullTagValue
+                               } else {
+                                       tagValue = srcFamily.Tags[j]
+                               }
                        }
 
-                       t := tagFamilySpec.Tags[j]
                        indexed := false
                        if r, ok := tfr[t.Name]; ok && tagValue != 
pbv1.NullTagValue {
                                if r.GetType() == 
databasev1.IndexRule_TYPE_INVERTED {
@@ -277,6 +288,8 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                return
        }
        groups := make(map[string]*elementsInGroup)
+       var metadata *commonv1.Metadata
+       var spec []*streamv1.TagFamilySpec
        for i := range events {
                var writeEvent *streamv1.InternalWriteRequest
                switch e := events[i].(type) {
@@ -292,8 +305,15 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                        w.l.Warn().Msg("invalid event data type")
                        continue
                }
+               req := writeEvent.Request
+               if req != nil && req.GetMetadata() != nil {
+                       metadata = req.GetMetadata()
+               }
+               if req != nil && req.GetTagFamilySpec() != nil {
+                       spec = req.GetTagFamilySpec()
+               }
                var err error
-               if groups, err = w.handle(groups, writeEvent); err != nil {
+               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
                        w.l.Error().Err(err).Msg("cannot handle write event")
                        groups = make(map[string]*elementsInGroup)
                        continue
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
        return
 }
 
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int, 
map[string]map[string]int) {
+       if spec == nil {
+               return nil, nil
+       }
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)
+               for j, tagName := range specFamily.GetTagNames() {
+                       tagMap[tagName] = j
+               }
+               specTagMaps[specFamily.GetName()] = tagMap
+       }
+       return specFamilyMap, specTagMaps
+}
+
+func getSrcFamilyAndTagMap(spec []*streamv1.TagFamilySpec, tagFamilySpec 
*databasev1.TagFamilySpec,
+       req *streamv1.WriteRequest, i int, specFamilyMap map[string]int,
+       specTagMaps map[string]map[string]int,
+) (*modelv1.TagFamilyForWrite, map[string]int) {
+       var srcFamily *modelv1.TagFamilyForWrite
+       var specTagMap map[string]int
+       if spec != nil {
+               specIdx, ok := specFamilyMap[tagFamilySpec.Name]
+               if ok && specIdx < len(req.Element.TagFamilies) {
+                       srcFamily = req.Element.TagFamilies[specIdx]
+               }
+               specTagMap = specTagMaps[tagFamilySpec.Name]
+       } else if len(req.Element.TagFamilies) > i {
+               srcFamily = req.Element.TagFamilies[i]
+       }
+       return srcFamily, specTagMap
+}
+
 func encodeTagValue(name string, tagType databasev1.TagType, tagVal 
*modelv1.TagValue) *tagValue {
        tv := generateTagValue()
        tv.tag = name
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 6bf27f41..a234cfab 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
@@ -70,12 +71,15 @@ func (w *writeQueueCallback) CheckHealth() *common.Error {
        return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk 
usage is too high, stop writing")
 }
 
-func (w *writeQueueCallback) handle(dst map[string]*tracesInQueue, writeEvent 
*tracev1.InternalWriteRequest) (map[string]*tracesInQueue, error) {
-       stm, ok := w.schemaRepo.loadTrace(writeEvent.GetRequest().GetMetadata())
+func (w *writeQueueCallback) handle(dst map[string]*tracesInQueue, writeEvent 
*tracev1.InternalWriteRequest,
+       metadata *commonv1.Metadata, spec *tracev1.TagSpec,
+) (map[string]*tracesInQueue, error) {
+       stm, ok := w.schemaRepo.loadTrace(metadata)
        if !ok {
-               return nil, fmt.Errorf("cannot find trace definition: %s", 
writeEvent.GetRequest().GetMetadata())
+               return nil, fmt.Errorf("cannot find trace definition: %s", 
metadata)
        }
-       idx, err := getTagIndex(stm, stm.schema.TimestampTagName)
+       specMap := buildSpecMap(spec)
+       idx, err := getTagIndex(stm, stm.schema.TimestampTagName, specMap)
        if err != nil {
                return nil, err
        }
@@ -84,7 +88,7 @@ func (w *writeQueueCallback) handle(dst 
map[string]*tracesInQueue, writeEvent *t
                return nil, fmt.Errorf("invalid timestamp: %w", err)
        }
        ts := t.UnixNano()
-       eq, err := w.prepareElementsInQueue(dst, writeEvent)
+       eq, err := w.prepareElementsInQueue(dst, metadata)
        if err != nil {
                return nil, err
        }
@@ -92,15 +96,15 @@ func (w *writeQueueCallback) handle(dst 
map[string]*tracesInQueue, writeEvent *t
        if err != nil {
                return nil, err
        }
-       err = processTraces(w.schemaRepo, et, writeEvent)
+       err = processTraces(w.schemaRepo, et, writeEvent, metadata, spec)
        if err != nil {
                return nil, err
        }
        return dst, nil
 }
 
-func (w *writeQueueCallback) prepareElementsInQueue(dst 
map[string]*tracesInQueue, writeEvent *tracev1.InternalWriteRequest) 
(*tracesInQueue, error) {
-       gn := writeEvent.Request.Metadata.Group
+func (w *writeQueueCallback) prepareElementsInQueue(dst 
map[string]*tracesInQueue, metadata *commonv1.Metadata) (*tracesInQueue, error) 
{
+       gn := metadata.Group
        queue, err := w.schemaRepo.loadQueue(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load queue for group %s: %w", 
gn, err)
@@ -165,6 +169,8 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
        groups := make(map[string]*tracesInQueue)
+       var metadata *commonv1.Metadata
+       var spec *tracev1.TagSpec
        for i := range events {
                var writeEvent *tracev1.InternalWriteRequest
                switch e := events[i].(type) {
@@ -180,8 +186,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                        w.l.Warn().Msg("invalid event data type")
                        continue
                }
+               req := writeEvent.Request
+               if req != nil && req.GetMetadata() != nil {
+                       metadata = req.GetMetadata()
+               }
+               if req != nil && req.GetTagSpec() != nil {
+                       spec = req.GetTagSpec()
+               }
                var err error
-               if groups, err = w.handle(groups, writeEvent); err != nil {
+               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
                        w.l.Error().Err(err).Msg("cannot handle write event")
                        groups = make(map[string]*tracesInQueue)
                        continue
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 18e09722..d7057662 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -26,6 +26,7 @@ import (
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
@@ -70,12 +71,14 @@ func (w *writeCallback) CheckHealth() *common.Error {
 }
 
 func (w *writeCallback) handle(dst map[string]*tracesInGroup, writeEvent 
*tracev1.InternalWriteRequest,
+       metadata *commonv1.Metadata, spec *tracev1.TagSpec,
 ) (map[string]*tracesInGroup, error) {
-       stm, ok := w.schemaRepo.loadTrace(writeEvent.GetRequest().GetMetadata())
+       stm, ok := w.schemaRepo.loadTrace(metadata)
        if !ok {
-               return nil, fmt.Errorf("cannot find trace definition: %s", 
writeEvent.GetRequest().GetMetadata())
+               return nil, fmt.Errorf("cannot find trace definition: %s", 
metadata)
        }
-       idx, err := getTagIndex(stm, stm.schema.TimestampTagName)
+       specMap := buildSpecMap(spec)
+       idx, err := getTagIndex(stm, stm.schema.TimestampTagName, specMap)
        if err != nil {
                return nil, err
        }
@@ -84,7 +87,7 @@ func (w *writeCallback) handle(dst map[string]*tracesInGroup, 
writeEvent *tracev
                return nil, fmt.Errorf("invalid timestamp: %w", err)
        }
        ts := t.UnixNano()
-       eg, err := w.prepareTracesInGroup(dst, writeEvent, ts)
+       eg, err := w.prepareTracesInGroup(dst, metadata, ts)
        if err != nil {
                return nil, err
        }
@@ -92,15 +95,15 @@ func (w *writeCallback) handle(dst 
map[string]*tracesInGroup, writeEvent *tracev
        if err != nil {
                return nil, err
        }
-       err = processTraces(w.schemaRepo, et, writeEvent)
+       err = processTraces(w.schemaRepo, et, writeEvent, metadata, spec)
        if err != nil {
                return nil, err
        }
        return dst, nil
 }
 
-func (w *writeCallback) prepareTracesInGroup(dst map[string]*tracesInGroup, 
writeEvent *tracev1.InternalWriteRequest, ts int64) (*tracesInGroup, error) {
-       gn := writeEvent.Request.Metadata.Group
+func (w *writeCallback) prepareTracesInGroup(dst map[string]*tracesInGroup, 
metadata *commonv1.Metadata, ts int64) (*tracesInGroup, error) {
+       gn := metadata.Group
        tsdb, err := w.schemaRepo.loadTSDB(gn)
        if err != nil {
                return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, 
err)
@@ -170,15 +173,15 @@ func (w *writeCallback) prepareTracesInTable(eg 
*tracesInGroup, writeEvent *trac
        return et, nil
 }
 
-func extractTraceSpanInfo(stm *trace, tracesInTable *tracesInTable, req 
*tracev1.WriteRequest) (string, error) {
-       idx, err := getTagIndex(stm, stm.schema.TraceIdTagName)
+func extractTraceSpanInfo(stm *trace, tracesInTable *tracesInTable, req 
*tracev1.WriteRequest, specMap map[string]int) (string, error) {
+       idx, err := getTagIndex(stm, stm.schema.TraceIdTagName, specMap)
        if err != nil {
                return "", err
        }
        traceID := req.Tags[idx].GetStr().GetValue()
        tracesInTable.traces.traceIDs = append(tracesInTable.traces.traceIDs, 
traceID)
 
-       idx, err = getTagIndex(stm, stm.schema.SpanIdTagName)
+       idx, err = getTagIndex(stm, stm.schema.SpanIdTagName, specMap)
        if err != nil {
                return "", err
        }
@@ -189,13 +192,19 @@ func extractTraceSpanInfo(stm *trace, tracesInTable 
*tracesInTable, req *tracev1
        return traceID, nil
 }
 
-func validateTags(stm *trace, req *tracev1.WriteRequest) error {
+func validateTags(stm *trace, req *tracev1.WriteRequest, spec 
*tracev1.TagSpec) error {
        tLen := len(req.GetTags())
        if tLen < 1 {
                return fmt.Errorf("%s has no tag family", req)
        }
-       if tLen > len(stm.schema.GetTags()) {
-               return fmt.Errorf("%s has more tag than %s", req.Metadata, 
stm.schema)
+       if spec != nil {
+               if tLen > len(spec.GetTagNames()) {
+                       return fmt.Errorf("request has more tags than spec: %d 
> %d", tLen, len(spec.GetTagNames()))
+               }
+       } else {
+               if tLen > len(stm.schema.GetTags()) {
+                       return fmt.Errorf("%s has more tag than %s", 
req.Metadata, stm.schema)
+               }
        }
 
        is := stm.indexSchema.Load().(indexSchema)
@@ -207,7 +216,7 @@ func validateTags(stm *trace, req *tracev1.WriteRequest) 
error {
        return nil
 }
 
-func buildTagsAndMap(stm *trace, tracesInTable *tracesInTable, req 
*tracev1.WriteRequest) ([]*tagValue, map[string]*tagValue) {
+func buildTagsAndMap(stm *trace, tracesInTable *tracesInTable, req 
*tracev1.WriteRequest, specMap map[string]int) ([]*tagValue, 
map[string]*tagValue) {
        tags := make([]*tagValue, 0, len(stm.schema.Tags))
        tagMap := make(map[string]*tagValue, len(stm.schema.Tags))
        tagSpecs := stm.GetSchema().GetTags()
@@ -217,15 +226,15 @@ func buildTagsAndMap(stm *trace, tracesInTable 
*tracesInTable, req *tracev1.Writ
                if tagSpec.Name == stm.schema.TraceIdTagName || tagSpec.Name == 
stm.schema.SpanIdTagName {
                        continue
                }
-               if tagSpec.Name == stm.schema.TimestampTagName {
-                       tracesInTable.traces.timestamps = 
append(tracesInTable.traces.timestamps, 
req.Tags[i].GetTimestamp().AsTime().UnixNano())
-               }
-
+               tagIdx, err := getTagIndex(stm, tagSpec.Name, specMap)
                var tagValue *modelv1.TagValue
-               if len(req.Tags) <= i {
+               if err != nil || tagIdx >= len(req.Tags) {
                        tagValue = pbv1.NullTagValue
                } else {
-                       tagValue = req.Tags[i]
+                       tagValue = req.Tags[tagIdx]
+               }
+               if tagSpec.Name == stm.schema.TimestampTagName && tagValue != 
pbv1.NullTagValue {
+                       tracesInTable.traces.timestamps = 
append(tracesInTable.traces.timestamps, 
tagValue.GetTimestamp().AsTime().UnixNano())
                }
                tv := encodeTagValue(tagSpec.Name, tagSpec.Type, tagValue)
                tags = append(tags, tv)
@@ -256,11 +265,13 @@ func buildSidxTags(tags []*tagValue) []sidx.Tag {
        return sidxTags
 }
 
-func processIndexRules(stm *trace, tracesInTable *tracesInTable, req 
*tracev1.WriteRequest, traceID string, tagMap map[string]*tagValue, sidxTags 
[]sidx.Tag) error {
+func processIndexRules(stm *trace, tracesInTable *tracesInTable, req 
*tracev1.WriteRequest,
+       traceID string, tagMap map[string]*tagValue, sidxTags []sidx.Tag, 
metadata *commonv1.Metadata, specMap map[string]int,
+) error {
        indexRules := stm.GetIndexRules()
        for _, indexRule := range indexRules {
                tagName := indexRule.Tags[len(indexRule.Tags)-1]
-               tagIdx, err := getTagIndex(stm, tagName)
+               tagIdx, err := getTagIndex(stm, tagName, specMap)
                if err != nil || tagIdx >= len(req.Tags) {
                        continue
                }
@@ -283,7 +294,7 @@ func processIndexRules(stm *trace, tracesInTable 
*tracesInTable, req *tracev1.Wr
 
                entityValues := make([]*modelv1.TagValue, 0, 
len(indexRule.Tags))
                for i, tagName := range indexRule.Tags {
-                       tagIdx, err := getTagIndex(stm, tagName)
+                       tagIdx, err := getTagIndex(stm, tagName, specMap)
                        if err != nil || tagIdx >= len(req.Tags) {
                                continue
                        }
@@ -294,7 +305,7 @@ func processIndexRules(stm *trace, tracesInTable 
*tracesInTable, req *tracev1.Wr
                }
 
                series := &pbv1.Series{
-                       Subject:      req.Metadata.Name,
+                       Subject:      metadata.Name,
                        EntityValues: entityValues,
                }
                if err := series.Marshal(); err != nil {
@@ -348,26 +359,25 @@ func processIndexRules(stm *trace, tracesInTable 
*tracesInTable, req *tracev1.Wr
        return nil
 }
 
-func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable, 
writeEvent *tracev1.InternalWriteRequest) error {
+func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable, 
writeEvent *tracev1.InternalWriteRequest,
+       metadata *commonv1.Metadata, spec *tracev1.TagSpec,
+) error {
        req := writeEvent.Request
-       stm, ok := schemaRepo.loadTrace(req.GetMetadata())
+       stm, ok := schemaRepo.loadTrace(metadata)
        if !ok {
-               return fmt.Errorf("cannot find trace definition: %s", 
req.GetMetadata())
+               return fmt.Errorf("cannot find trace definition: %s", metadata)
        }
-
-       traceID, err := extractTraceSpanInfo(stm, tracesInTable, req)
+       specMap := buildSpecMap(spec)
+       traceID, err := extractTraceSpanInfo(stm, tracesInTable, req, specMap)
        if err != nil {
                return err
        }
-
-       if err := validateTags(stm, req); err != nil {
+       if err := validateTags(stm, req, spec); err != nil {
                return err
        }
-
-       tags, tagMap := buildTagsAndMap(stm, tracesInTable, req)
+       tags, tagMap := buildTagsAndMap(stm, tracesInTable, req, specMap)
        sidxTags := buildSidxTags(tags)
-
-       return processIndexRules(stm, tracesInTable, req, traceID, tagMap, 
sidxTags)
+       return processIndexRules(stm, tracesInTable, req, traceID, tagMap, 
sidxTags, metadata, specMap)
 }
 
 func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
@@ -381,6 +391,8 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                return
        }
        groups := make(map[string]*tracesInGroup)
+       var metadata *commonv1.Metadata
+       var spec *tracev1.TagSpec
        for i := range events {
                var writeEvent *tracev1.InternalWriteRequest
                switch e := events[i].(type) {
@@ -396,8 +408,15 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                        w.l.Warn().Msg("invalid event data type")
                        continue
                }
+               req := writeEvent.Request
+               if req != nil && req.GetMetadata() != nil {
+                       metadata = req.GetMetadata()
+               }
+               if req != nil && req.GetTagSpec() != nil {
+                       spec = req.GetTagSpec()
+               }
                var err error
-               if groups, err = w.handle(groups, writeEvent); err != nil {
+               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
                        w.l.Error().Err(err).Msg("cannot handle write event")
                        groups = make(map[string]*tracesInGroup)
                        continue
@@ -444,6 +463,32 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
        return
 }
 
+func buildSpecMap(spec *tracev1.TagSpec) map[string]int {
+       if spec == nil {
+               return nil
+       }
+       specMap := make(map[string]int, 0)
+       for i, name := range spec.GetTagNames() {
+               specMap[name] = i
+       }
+       return specMap
+}
+
+func getTagIndex(trace *trace, name string, specMap map[string]int) (int, 
error) {
+       if specMap != nil {
+               if idx, ok := specMap[name]; ok {
+                       return idx, nil
+               }
+               return -1, fmt.Errorf("tag %s not found in spec", name)
+       }
+       for i, tag := range trace.schema.Tags {
+               if tag.Name == name {
+                       return i, nil
+               }
+       }
+       return -1, fmt.Errorf("tag %s not found in trace %s", name, trace.name)
+}
+
 func encodeTagValue(name string, tagType databasev1.TagType, tagVal 
*modelv1.TagValue) *tagValue {
        tv := generateTagValue()
        tv.tag = name
@@ -494,12 +539,3 @@ func encodeTagValue(name string, tagType 
databasev1.TagType, tagVal *modelv1.Tag
        }
        return tv
 }
-
-func getTagIndex(trace *trace, name string) (int, error) {
-       for i, tag := range trace.schema.Tags {
-               if tag.Name == name {
-                       return i, nil
-               }
-       }
-       return -1, fmt.Errorf("tag %s not found in trace %s", name, trace.name)
-}
diff --git a/pkg/test/stream/testdata/group.json 
b/pkg/test/stream/testdata/group.json
index e4116dd7..8e312f1c 100644
--- a/pkg/test/stream/testdata/group.json
+++ b/pkg/test/stream/testdata/group.json
@@ -18,6 +18,25 @@
     },
     "updated_at": "2021-04-15T01:30:15.01Z"
   },
+  {
+    "metadata": {
+      "name": "default-spec"
+    },
+    "catalog": "CATALOG_STREAM",
+    "resource_opts": {
+      "shard_num": 2,
+      "replicas": 1,
+      "segment_interval": {
+        "unit": "UNIT_DAY",
+        "num": 1
+      },
+      "ttl": {
+        "unit": "UNIT_DAY",
+        "num": 3
+      }
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+  },
   {
     "metadata": {
       "name": "updated"
@@ -36,4 +55,4 @@
     },
     "updated_at": "2021-04-15T01:30:15.01Z"
   }
-]
\ No newline at end of file
+]
diff --git a/pkg/test/stream/testdata/group_with_stages.json 
b/pkg/test/stream/testdata/group_with_stages.json
index b5aad645..749bde93 100644
--- a/pkg/test/stream/testdata/group_with_stages.json
+++ b/pkg/test/stream/testdata/group_with_stages.json
@@ -64,5 +64,38 @@
       ]
     },
     "updated_at": "2021-04-15T01:30:15.01Z"
+  },
+  {
+    "metadata": {
+      "name": "default-spec"
+    },
+    "catalog": "CATALOG_STREAM",
+    "resource_opts": {
+      "shard_num": 2,
+      "segment_interval": {
+        "unit": "UNIT_DAY",
+        "num": 1
+      },
+      "ttl": {
+        "unit": "UNIT_DAY",
+        "num": 3
+      },
+      "stages": [
+        {
+          "name": "warm",
+          "shard_num": 1,
+          "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 3
+          },
+          "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 7
+          },
+          "node_selector": "type=warm"
+        }
+      ]
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
   }
 ]
\ No newline at end of file
diff --git a/pkg/test/stream/testdata/index_rule_bindings/sw_spec.json 
b/pkg/test/stream/testdata/index_rule_bindings/sw_spec.json
new file mode 100644
index 00000000..ba14f693
--- /dev/null
+++ b/pkg/test/stream/testdata/index_rule_bindings/sw_spec.json
@@ -0,0 +1,27 @@
+{
+  "metadata": {
+    "name": "sw-spec-index-rule-binding",
+    "group": "default-spec"
+  },
+  "rules": [
+    "trace_id",
+    "duration",
+    "endpoint_id",
+    "status_code",
+    "http.method",
+    "db.instance",
+    "db.type",
+    "mq.broker",
+    "mq.queue",
+    "mq.topic",
+    "extended_tags"
+  ],
+  "subject": {
+    "catalog": "CATALOG_STREAM",
+    "name": "sw"
+  },
+  "begin_at": "2021-04-15T01:30:15.01Z",
+  "expire_at": "2121-04-15T01:30:15.01Z",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/pkg/test/stream/testdata/streams/sw_spec.json 
b/pkg/test/stream/testdata/streams/sw_spec.json
new file mode 100644
index 00000000..31b7b58f
--- /dev/null
+++ b/pkg/test/stream/testdata/streams/sw_spec.json
@@ -0,0 +1,95 @@
+{
+  "metadata": {
+    "name": "sw",
+    "group": "default-spec"
+  },
+  "tag_families": [
+    {
+      "name": "data",
+      "tags": [
+        {
+          "name": "data_binary",
+          "type": "TAG_TYPE_DATA_BINARY"
+        }
+      ]
+    },
+    {
+      "name": "searchable",
+      "tags": [
+        {
+          "name": "trace_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "state",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "service_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "service_instance_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "endpoint_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "duration",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "start_time",
+          "type": "TAG_TYPE_INT"
+        },
+        {
+          "name": "http.method",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "status_code",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "span_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "db.type",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "db.instance",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "mq.queue",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "mq.topic",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "mq.broker",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "extended_tags",
+          "type": "TAG_TYPE_STRING_ARRAY"
+        },
+        {
+          "name": "non_indexed_tags",
+          "type": "TAG_TYPE_STRING_ARRAY"
+        }
+      ]
+    }
+  ],
+  "entity": {
+    "tag_names": ["service_id", "service_instance_id", "state"]
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/pkg/test/trace/testdata/groups/test-trace-spec.json 
b/pkg/test/trace/testdata/groups/test-trace-spec.json
new file mode 100644
index 00000000..29c24dd0
--- /dev/null
+++ b/pkg/test/trace/testdata/groups/test-trace-spec.json
@@ -0,0 +1,19 @@
+{
+    "metadata": {
+        "name": "test-trace-spec"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+        "shard_num": 2,
+        "replicas": 0,
+        "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 1
+        },
+        "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 3
+        }
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/trace/testdata/groups_stages/test-trace-spec.json 
b/pkg/test/trace/testdata/groups_stages/test-trace-spec.json
new file mode 100644
index 00000000..8cd0290f
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/test-trace-spec.json
@@ -0,0 +1,34 @@
+{
+  "metadata": {
+    "name": "test-trace-spec"
+  },
+  "catalog": "CATALOG_TRACE",
+  "resource_opts": {
+    "shard_num": 2,
+    "replicas": 0,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 3
+    },
+    "stages": [
+      {
+        "name": "warm",
+        "shard_num": 2,
+        "segment_interval": {
+          "unit": "UNIT_DAY",
+          "num": 3
+        },
+        "ttl": {
+          "unit": "UNIT_DAY",
+          "num": 7
+        },
+        "node_selector": "type=warm"
+      }
+    ]
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rule_bindings/sw_spec.json 
b/pkg/test/trace/testdata/index_rule_bindings/sw_spec.json
new file mode 100644
index 00000000..810c1612
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rule_bindings/sw_spec.json
@@ -0,0 +1,14 @@
+{
+  "metadata": {
+    "name": "sw-spec-index-rule-binding",
+    "group": "test-trace-spec"
+  },
+  "rules": ["duration", "timestamp"],
+  "subject": {
+    "catalog": "CATALOG_TRACE",
+    "name": "sw"
+  },
+  "begin_at": "2021-04-15T01:30:15.01Z",
+  "expire_at": "2121-04-15T01:30:15.01Z",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/duration_spec.json 
b/pkg/test/trace/testdata/index_rules/duration_spec.json
new file mode 100644
index 00000000..a89781ee
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/duration_spec.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "duration",
+        "group": "test-trace-spec"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "duration"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/timestamp_spec.json 
b/pkg/test/trace/testdata/index_rules/timestamp_spec.json
new file mode 100644
index 00000000..ba1a0e27
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/timestamp_spec.json
@@ -0,0 +1,14 @@
+{
+    "metadata": {
+        "name": "timestamp",
+        "group": "test-trace-spec"
+    },
+    "tags": [
+        "service_id",
+        "service_instance_id",
+        "state",
+        "timestamp"
+    ],
+    "type": "TYPE_TREE",
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/sw_spec.json 
b/pkg/test/trace/testdata/traces/sw_spec.json
new file mode 100644
index 00000000..9b0841b7
--- /dev/null
+++ b/pkg/test/trace/testdata/traces/sw_spec.json
@@ -0,0 +1,45 @@
+{
+  "metadata": {
+    "name": "sw",
+    "group": "test-trace-spec"
+  },
+  "tags": [
+    {
+      "name": "trace_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "state",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "service_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "service_instance_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "endpoint_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "duration",
+      "type": "TAG_TYPE_INT"
+    },
+    {
+      "name": "span_id",
+      "type": "TAG_TYPE_STRING"
+    },
+    {
+      "name": "timestamp",
+      "type": "TAG_TYPE_TIMESTAMP"
+    }
+  ],
+  "trace_id_tag_name": "trace_id",
+  "span_id_tag_name": "span_id",
+  "timestamp_tag_name": "timestamp",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/test/cases/init.go b/test/cases/init.go
index dc66b595..8cc611e9 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -26,6 +26,8 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        caseproperty 
"github.com/apache/skywalking-banyandb/test/cases/property/data"
@@ -43,6 +45,33 @@ func Initialize(addr string, now time.Time) {
        casesstreamdata.Write(conn, "sw", now, interval)
        casesstreamdata.Write(conn, "duplicated", now, 0)
        casesstreamdata.WriteToGroup(conn, "sw", "updated", "sw_updated", 
now.Add(time.Minute), interval)
+       casesstreamdata.WriteWithSpec(conn, "sw", "default-spec", 
now.Add(2*time.Minute), interval,
+               casesstreamdata.SpecWithData{
+                       Spec: []*streamv1.TagFamilySpec{
+                               {
+                                       Name:     "data",
+                                       TagNames: []string{"data_binary"},
+                               },
+                               {
+                                       Name:     "searchable",
+                                       TagNames: []string{"trace_id", "state", 
"service_id", "service_instance_id", "endpoint_id", "duration", "start_time", 
"http.method", "status_code", "span_id"},
+                               },
+                       },
+                       DataFile: "sw_spec_order.json",
+               },
+               casesstreamdata.SpecWithData{
+                       Spec: []*streamv1.TagFamilySpec{
+                               {
+                                       Name:     "searchable",
+                                       TagNames: []string{"span_id", 
"status_code", "http.method", "duration", "state", "endpoint_id", 
"service_instance_id", "start_time", "service_id", "trace_id"},
+                               },
+                               {
+                                       Name:     "data",
+                                       TagNames: []string{"data_binary"},
+                               },
+                       },
+                       DataFile: "sw_spec_order2.json",
+               })
        // measure
        interval = time.Minute
        casesmeasuredata.Write(conn, "service_traffic", "index_mode", 
"service_traffic_data_old.json", now.AddDate(0, 0, -2), interval)
@@ -107,6 +136,19 @@ func Initialize(addr string, now time.Time) {
        casestrace.WriteToGroup(conn, "sw", "test-trace-updated", "sw_updated", 
now.Add(time.Minute), interval)
        time.Sleep(5 * time.Second)
        casestrace.WriteToGroup(conn, "sw", "test-trace-group", 
"sw_mixed_traces", now.Add(time.Minute), interval)
+       casestrace.WriteWithSpec(conn, "sw", "test-trace-spec", 
now.Add(2*time.Minute), interval,
+               casestrace.SpecWithData{
+                       Spec: &tracev1.TagSpec{
+                               TagNames: []string{"trace_id", "state", 
"service_id", "service_instance_id", "endpoint_id", "duration", "span_id", 
"timestamp"},
+                       },
+                       DataFile: "sw_spec_order.json",
+               },
+               casestrace.SpecWithData{
+                       Spec: &tracev1.TagSpec{
+                               TagNames: []string{"span_id", "duration", 
"endpoint_id", "service_instance_id", "service_id", "state", "trace_id", 
"timestamp"},
+                       },
+                       DataFile: "sw_spec_order2.json",
+               })
        // property
        caseproperty.Write(conn, "sw1")
        caseproperty.Write(conn, "sw2")
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index d99361ba..eb333960 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -274,3 +274,71 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group, 
fileName string, baseTi
                return err
        }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
 }
+
+// SpecWithData pairs a TagFamilySpec with a data file.
+type SpecWithData struct {
+       DataFile string
+       Spec     []*streamv1.TagFamilySpec
+}
+
+// WriteWithSpec writes stream data using multiple tag_family_specs to specify 
tag names.
+func WriteWithSpec(conn *grpclib.ClientConn, name, group string,
+       baseTime time.Time, interval time.Duration, specDataPairs 
...SpecWithData,
+) {
+       ctx := context.Background()
+       md := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+
+       schemaClient := databasev1.NewStreamRegistryServiceClient(conn)
+       resp, err := schemaClient.Get(ctx, 
&databasev1.StreamRegistryServiceGetRequest{Metadata: md})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       md = resp.GetStream().GetMetadata()
+
+       c := streamv1.NewStreamServiceClient(conn)
+       writeClient, err := c.Write(ctx)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+       isFirstRequest := true
+       elementCounter := 0
+       currentTime := baseTime
+       for _, pair := range specDataPairs {
+               var templates []interface{}
+               content, err := dataFS.ReadFile("testdata/" + pair.DataFile)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+               gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+
+               isFirstForSpec := true
+               for i, template := range templates {
+                       rawElementValue, errMarshal := json.Marshal(template)
+                       gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
+                       elementValue := &streamv1.ElementValue{}
+                       gm.Expect(protojson.Unmarshal(rawElementValue, 
elementValue)).ShouldNot(gm.HaveOccurred())
+                       elementValue.ElementId = strconv.Itoa(elementCounter)
+                       elementValue.Timestamp = 
timestamppb.New(currentTime.Add(time.Duration(i) * interval))
+
+                       req := &streamv1.WriteRequest{
+                               Element:   elementValue,
+                               MessageId: uint64(time.Now().UnixNano()),
+                       }
+                       if isFirstRequest {
+                               req.Metadata = md
+                               isFirstRequest = false
+                       }
+                       if isFirstForSpec {
+                               req.TagFamilySpec = pair.Spec
+                               isFirstForSpec = false
+                       }
+                       gm.Expect(writeClient.Send(req)).Should(gm.Succeed())
+                       elementCounter++
+               }
+               currentTime = currentTime.Add(time.Duration(len(templates)) * 
interval)
+       }
+
+       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+       gm.Eventually(func() error {
+               _, err := writeClient.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
diff --git a/test/cases/stream/data/input/write_spec.ql 
b/test/cases/stream/data/input/write_spec.ql
new file mode 100644
index 00000000..f057350a
--- /dev/null
+++ b/test/cases/stream/data/input/write_spec.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT trace_id, state, duration, status_code, span_id FROM STREAM sw IN 
default-spec
+TIME > '-15m'
+WHERE trace_id IN ('spec_trace_2', 'spec_trace_5')
diff --git a/test/cases/stream/data/input/write_spec.yaml 
b/test/cases/stream/data/input/write_spec.yaml
new file mode 100644
index 00000000..00fd8854
--- /dev/null
+++ b/test/cases/stream/data/input/write_spec.yaml
@@ -0,0 +1,31 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default-spec"]
+projection:
+  tagFamilies:
+    - name: "searchable"
+      tags: ["trace_id", "state", "duration", "status_code", "span_id"]
+criteria:
+  condition:
+    name: "trace_id"
+    op: "BINARY_OP_IN"
+    value:
+      strArray:
+        value: ["spec_trace_2", "spec_trace_5"]
+
diff --git a/test/cases/stream/data/testdata/sw_spec_order.json 
b/test/cases/stream/data/testdata/sw_spec_order.json
new file mode 100644
index 00000000..5e96c4c3
--- /dev/null
+++ b/test/cases/stream/data/testdata/sw_spec_order.json
@@ -0,0 +1,66 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+      },
+      {
+        "tags": [
+          { "str": { "value": "spec_trace_1" } },
+          { "int": { "value": 0 } },
+          { "str": { "value": "webapp_id" } },
+          { "str": { "value": "10.0.0.1_id" } },
+          { "str": { "value": "/home_id" } },
+          { "int": { "value": 100 } },
+          { "int": { "value": 1622933202000000000 } },
+          { "str": { "value": "GET" } },
+          { "str": { "value": "200" } },
+          { "str": { "value": "spec_span_1" } }
+        ]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+      },
+      {
+        "tags": [
+          { "str": { "value": "spec_trace_2" } },
+          { "int": { "value": 1 } },
+          { "str": { "value": "webapp_id" } },
+          { "str": { "value": "10.0.0.2_id" } },
+          { "str": { "value": "/product_id" } },
+          { "int": { "value": 200 } },
+          { "int": { "value": 1622933202000000000 } },
+          { "str": { "value": "POST" } },
+          { "str": { "value": "201" } },
+          { "str": { "value": "spec_span_2" } }
+        ]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+      },
+      {
+        "tags": [
+          { "str": { "value": "spec_trace_3" } },
+          { "int": { "value": 0 } },
+          { "str": { "value": "webapp_id" } },
+          { "str": { "value": "10.0.0.3_id" } },
+          { "str": { "value": "/item_id" } },
+          { "int": { "value": 300 } },
+          { "int": { "value": 1622933202000000000 } },
+          { "str": { "value": "PUT" } },
+          { "str": { "value": "202" } },
+          { "str": { "value": "spec_span_3" } }
+        ]
+      }
+    ]
+  }
+]
+
diff --git a/test/cases/stream/data/testdata/sw_spec_order2.json 
b/test/cases/stream/data/testdata/sw_spec_order2.json
new file mode 100644
index 00000000..15b52fb0
--- /dev/null
+++ b/test/cases/stream/data/testdata/sw_spec_order2.json
@@ -0,0 +1,45 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          { "str": { "value": "spec_span_4" } },
+          { "str": { "value": "204" } },
+          { "str": { "value": "DELETE" } },
+          { "int": { "value": 400 } },
+          { "int": { "value": 1 } },
+          { "str": { "value": "/order_id" } },
+          { "str": { "value": "10.0.0.4_id" } },
+          { "int": { "value": 1622933202000000000 } },
+          { "str": { "value": "webapp_id" } },
+          { "str": { "value": "spec_trace_4" } }
+        ]
+      },
+      {
+        "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          { "str": { "value": "spec_span_5" } },
+          { "str": { "value": "200" } },
+          { "str": { "value": "PATCH" } },
+          { "int": { "value": 500 } },
+          { "int": { "value": 0 } },
+          { "str": { "value": "/cart_id" } },
+          { "str": { "value": "10.0.0.5_id" } },
+          { "int": { "value": 1622933202000000000 } },
+          { "str": { "value": "webapp_id" } },
+          { "str": { "value": "spec_trace_5" } }
+        ]
+      },
+      {
+        "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+      }
+    ]
+  }
+]
+
diff --git a/test/cases/stream/data/want/write_spec.yaml 
b/test/cases/stream/data/want/write_spec.yaml
new file mode 100644
index 00000000..a40da633
--- /dev/null
+++ b/test/cases/stream/data/want/write_spec.yaml
@@ -0,0 +1,65 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+elements:
+  - tagFamilies:
+      - name: searchable
+        tags:
+          - key: trace_id
+            value:
+              str:
+                value: spec_trace_2
+          - key: state
+            value:
+              int:
+                value: "1"
+          - key: duration
+            value:
+              int:
+                value: "200"
+          - key: status_code
+            value:
+              str:
+                value: "201"
+          - key: span_id
+            value:
+              str:
+                value: spec_span_2
+  - tagFamilies:
+      - name: searchable
+        tags:
+          - key: trace_id
+            value:
+              str:
+                value: spec_trace_5
+          - key: state
+            value:
+              int:
+                value: "0"
+          - key: duration
+            value:
+              int:
+                value: "500"
+          - key: status_code
+            value:
+              str:
+                value: "200"
+          - key: span_id
+            value:
+              str:
+                value: spec_span_5
+
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index c0715df0..53e965cb 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -91,4 +91,5 @@ var _ = g.DescribeTable("Scanning Streams", func(args 
helpers.Args) {
        g.Entry("multi-groups: sort duration", helpers.Args{Input: 
"multi_group_sort_duration", Duration: 1 * time.Hour, IgnoreElementID: true}),
        g.Entry("hybrid index", helpers.Args{Input: "hybrid_index", Duration: 1 
* time.Hour, IgnoreElementID: true}),
        g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * 
time.Hour, WantErr: true}),
+       g.Entry("write spec", helpers.Args{Input: "write_spec", Duration: 1 * 
time.Hour, IgnoreElementID: true}),
 )
diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go
index b4d8f599..0ace3f05 100644
--- a/test/cases/trace/data/data.go
+++ b/test/cases/trace/data/data.go
@@ -327,6 +327,94 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group, 
fileName string, baseTi
        }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
 }
 
+// SpecWithData pairs a TagSpec with a data file.
+type SpecWithData struct {
+       Spec     *tracev1.TagSpec
+       DataFile string
+}
+
+// WriteWithSpec writes trace data using tag_spec to specify tag names.
+func WriteWithSpec(conn *grpclib.ClientConn, name, group string,
+       baseTime time.Time, interval time.Duration, specDataPairs 
...SpecWithData,
+) {
+       ctx := context.Background()
+       md := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+
+       schemaClient := databasev1.NewTraceRegistryServiceClient(conn)
+       resp, err := schemaClient.Get(ctx, 
&databasev1.TraceRegistryServiceGetRequest{Metadata: md})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       md = resp.GetTrace().GetMetadata()
+
+       c := tracev1.NewTraceServiceClient(conn)
+       writeClient, err := c.Write(ctx)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+       isFirstRequest := true
+       version := uint64(1)
+       currentTime := baseTime
+       for _, pair := range specDataPairs {
+               var templates []interface{}
+               content, err := dataFS.ReadFile("testdata/" + pair.DataFile)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+               gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+
+               isFirstForSpec := true
+               for i, template := range templates {
+                       templateMap, ok := template.(map[string]interface{})
+                       gm.Expect(ok).To(gm.BeTrue())
+
+                       spanData, ok := templateMap["span"].(string)
+                       gm.Expect(ok).To(gm.BeTrue())
+
+                       tagsData, ok := templateMap["tags"].([]interface{})
+                       gm.Expect(ok).To(gm.BeTrue())
+
+                       var tagValues []*modelv1.TagValue
+                       for _, tag := range tagsData {
+                               tagBytes, err := json.Marshal(tag)
+                               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+                               tagValue := &modelv1.TagValue{}
+                               gm.Expect(protojson.Unmarshal(tagBytes, 
tagValue)).ShouldNot(gm.HaveOccurred())
+                               tagValues = append(tagValues, tagValue)
+                       }
+
+                       timestamp := currentTime.Add(time.Duration(i) * 
interval)
+                       timestampTag := &modelv1.TagValue{
+                               Value: &modelv1.TagValue_Timestamp{
+                                       Timestamp: timestamppb.New(timestamp),
+                               },
+                       }
+                       tagValues = append(tagValues, timestampTag)
+
+                       req := &tracev1.WriteRequest{
+                               Tags:    tagValues,
+                               Span:    []byte(spanData),
+                               Version: version,
+                       }
+                       if isFirstRequest {
+                               req.Metadata = md
+                               isFirstRequest = false
+                       }
+                       if isFirstForSpec {
+                               req.TagSpec = pair.Spec
+                               isFirstForSpec = false
+                       }
+                       gm.Expect(writeClient.Send(req)).Should(gm.Succeed())
+                       version++
+               }
+               currentTime = currentTime.Add(time.Duration(len(templates)) * 
interval)
+       }
+
+       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+       gm.Eventually(func() error {
+               _, err := writeClient.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
+
 // unmarshalYAMLWithSpanEncoding decodes YAML with special handling for span 
data.
 // It converts plain strings in the YAML to base64 encoded strings before 
protobuf unmarshaling.
 func unmarshalYAMLWithSpanEncoding(yamlData []byte, response 
*tracev1.QueryResponse) {
diff --git a/test/cases/trace/data/input/write_spec.ql 
b/test/cases/trace/data/input/write_spec.ql
new file mode 100644
index 00000000..3f9ee96e
--- /dev/null
+++ b/test/cases/trace/data/input/write_spec.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT trace_id, state, duration, span_id FROM TRACE sw IN test-trace-spec
+TIME > '-15m'
+WHERE trace_id IN ('spec_trace_001', 'spec_trace_003')
diff --git a/test/cases/trace/data/input/write_spec.yml 
b/test/cases/trace/data/input/write_spec.yml
new file mode 100644
index 00000000..3ca0807d
--- /dev/null
+++ b/test/cases/trace/data/input/write_spec.yml
@@ -0,0 +1,28 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-spec"]
+tag_projection: ["trace_id", "state", "duration", "span_id"]
+criteria:
+  condition:
+    name: "trace_id"
+    op: "BINARY_OP_IN"
+    value:
+      str_array:
+        value: ["spec_trace_001", "spec_trace_003"]
+
diff --git a/test/cases/trace/data/testdata/sw_spec_order.json 
b/test/cases/trace/data/testdata/sw_spec_order.json
new file mode 100644
index 00000000..264e0d33
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw_spec_order.json
@@ -0,0 +1,123 @@
+[
+  {
+    "tags": [
+      {
+        "str": {
+          "value": "spec_trace_001"
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_service"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_instance_1"
+        }
+      },
+      {
+        "str": {
+          "value": "/home_endpoint"
+        }
+      },
+      {
+        "int": {
+          "value": 1100
+        }
+      },
+      {
+        "str": {
+          "value": "spec_span_001_1"
+        }
+      }
+    ],
+    "span": "spec_trace_001_span_1"
+  },
+  {
+    "tags": [
+      {
+        "str": {
+          "value": "spec_trace_001"
+        }
+      },
+      {
+        "int": {
+          "value": 0
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_service"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_instance_2"
+        }
+      },
+      {
+        "str": {
+          "value": "/product_endpoint"
+        }
+      },
+      {
+        "int": {
+          "value": 600
+        }
+      },
+      {
+        "str": {
+          "value": "spec_span_001_2"
+        }
+      }
+    ],
+    "span": "spec_trace_001_span_2"
+  },
+  {
+    "tags": [
+      {
+        "str": {
+          "value": "spec_trace_002"
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_service"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_instance_3"
+        }
+      },
+      {
+        "str": {
+          "value": "/item_endpoint"
+        }
+      },
+      {
+        "int": {
+          "value": 900
+        }
+      },
+      {
+        "str": {
+          "value": "spec_span_002_1"
+        }
+      }
+    ],
+    "span": "spec_trace_002_span_1"
+  }
+]
+
diff --git a/test/cases/trace/data/testdata/sw_spec_order2.json 
b/test/cases/trace/data/testdata/sw_spec_order2.json
new file mode 100644
index 00000000..6b4d8164
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw_spec_order2.json
@@ -0,0 +1,83 @@
+[
+  {
+    "tags": [
+      {
+        "str": {
+          "value": "spec_span_003_1"
+        }
+      },
+      {
+        "int": {
+          "value": 750
+        }
+      },
+      {
+        "str": {
+          "value": "/order_endpoint"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_instance_1"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_service"
+        }
+      },
+      {
+        "int": {
+          "value": 0
+        }
+      },
+      {
+        "str": {
+          "value": "spec_trace_003"
+        }
+      }
+    ],
+    "span": "spec_trace_003_span_1"
+  },
+  {
+    "tags": [
+      {
+        "str": {
+          "value": "spec_span_003_2"
+        }
+      },
+      {
+        "int": {
+          "value": 450
+        }
+      },
+      {
+        "str": {
+          "value": "/cart_endpoint"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_instance_2"
+        }
+      },
+      {
+        "str": {
+          "value": "webapp_service"
+        }
+      },
+      {
+        "int": {
+          "value": 0
+        }
+      },
+      {
+        "str": {
+          "value": "spec_trace_003"
+        }
+      }
+    ],
+    "span": "spec_trace_003_span_2"
+  }
+]
+
diff --git a/test/cases/trace/data/want/write_spec.yml 
b/test/cases/trace/data/want/write_spec.yml
new file mode 100644
index 00000000..b0d6305c
--- /dev/null
+++ b/test/cases/trace/data/want/write_spec.yml
@@ -0,0 +1,96 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+  - spans:
+      - span: spec_trace_001_span_1
+        spanId: spec_span_001_1
+        tags:
+          - key: state
+            value:
+              int:
+                value: "1"
+          - key: duration
+            value:
+              int:
+                value: "1100"
+          - key: trace_id
+            value:
+              str:
+                value: spec_trace_001
+          - key: span_id
+            value:
+              str:
+                value: spec_span_001_1
+      - span: spec_trace_001_span_2
+        spanId: spec_span_001_2
+        tags:
+          - key: state
+            value:
+              int: {}
+          - key: duration
+            value:
+              int:
+                value: "600"
+          - key: trace_id
+            value:
+              str:
+                value: spec_trace_001
+          - key: span_id
+            value:
+              str:
+                value: spec_span_001_2
+    traceId: spec_trace_001
+  - spans:
+      - span: spec_trace_003_span_1
+        spanId: spec_span_003_1
+        tags:
+          - key: state
+            value:
+              int: {}
+          - key: duration
+            value:
+              int:
+                value: "750"
+          - key: trace_id
+            value:
+              str:
+                value: spec_trace_003
+          - key: span_id
+            value:
+              str:
+                value: spec_span_003_1
+      - span: spec_trace_003_span_2
+        spanId: spec_span_003_2
+        tags:
+          - key: state
+            value:
+              int: {}
+          - key: duration
+            value:
+              int:
+                value: "450"
+          - key: trace_id
+            value:
+              str:
+                value: spec_trace_003
+          - key: span_id
+            value:
+              str:
+                value: spec_span_003_2
+    traceId: spec_trace_003
+
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index fdf141f1..cc72cf12 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -59,4 +59,5 @@ var _ = g.DescribeTable("Scanning Traces", func(args 
helpers.Args) {
        g.Entry("multi-groups: new tag", helpers.Args{Input: 
"multi_group_new_tag", Duration: 1 * time.Hour}),
        g.Entry("multi-groups: tag type change", helpers.Args{Input: 
"multi_group_tag_type", Duration: 1 * time.Hour}),
        g.Entry("multi-groups: sort by duration", helpers.Args{Input: 
"multi_group_sort_duration", Duration: 1 * time.Hour}),
+       g.Entry("write spec", helpers.Args{Input: "write_spec", Duration: 1 * 
time.Hour, DisOrder: true}),
 )

Reply via email to