This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ccbfd1f6 Support writing stream and trace data with specifications
(#888)
ccbfd1f6 is described below
commit ccbfd1f6ac4b2c342362dae8b524a0f2b9a80cd7
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Dec 12 09:42:16 2025 +0800
Support writing stream and trace data with specifications (#888)
* Support writing stream and trace data with specifications
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
CHANGES.md | 1 +
banyand/liaison/grpc/discovery.go | 67 +++--
banyand/liaison/grpc/locator.go | 120 +++++++++
banyand/liaison/grpc/measure.go | 183 ++++---------
banyand/liaison/grpc/stream.go | 167 ++++++++----
banyand/liaison/grpc/trace.go | 289 ++++++++++++++-------
banyand/metadata/schema/etcd.go | 8 +-
banyand/stream/write_liaison.go | 24 +-
banyand/stream/write_standalone.go | 99 +++++--
banyand/trace/write_liaison.go | 31 ++-
banyand/trace/write_standalone.go | 126 +++++----
pkg/test/stream/testdata/group.json | 21 +-
pkg/test/stream/testdata/group_with_stages.json | 33 +++
.../testdata/index_rule_bindings/sw_spec.json | 27 ++
pkg/test/stream/testdata/streams/sw_spec.json | 95 +++++++
.../trace/testdata/groups/test-trace-spec.json | 19 ++
.../testdata/groups_stages/test-trace-spec.json | 34 +++
.../testdata/index_rule_bindings/sw_spec.json | 14 +
.../trace/testdata/index_rules/duration_spec.json | 14 +
.../trace/testdata/index_rules/timestamp_spec.json | 14 +
pkg/test/trace/testdata/traces/sw_spec.json | 45 ++++
test/cases/init.go | 42 +++
test/cases/stream/data/data.go | 68 +++++
test/cases/stream/data/input/write_spec.ql | 21 ++
test/cases/stream/data/input/write_spec.yaml | 31 +++
test/cases/stream/data/testdata/sw_spec_order.json | 66 +++++
.../cases/stream/data/testdata/sw_spec_order2.json | 45 ++++
test/cases/stream/data/want/write_spec.yaml | 65 +++++
test/cases/stream/stream.go | 1 +
test/cases/trace/data/data.go | 88 +++++++
test/cases/trace/data/input/write_spec.ql | 21 ++
test/cases/trace/data/input/write_spec.yml | 28 ++
test/cases/trace/data/testdata/sw_spec_order.json | 123 +++++++++
test/cases/trace/data/testdata/sw_spec_order2.json | 83 ++++++
test/cases/trace/data/want/write_spec.yml | 96 +++++++
test/cases/trace/trace.go | 1 +
36 files changed, 1827 insertions(+), 383 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2993a4e4..966470fd 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
- Add sorted query support for the Property.
- Update bydbQL to add sorted query support for the Property.
- Remove the windows arch for binary and docker image.
+- Support writing data with specifications.
### Bug Fixes
diff --git a/banyand/liaison/grpc/discovery.go
b/banyand/liaison/grpc/discovery.go
index abeb9f99..db6953dd 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -50,6 +50,7 @@ func newDiscoveryService(kind schema.Kind, metadataRepo
metadata.Repo, nodeRegis
er := &entityRepo{
entitiesMap: make(map[identity]partition.Locator),
measureMap: make(map[identity]*databasev1.Measure),
+ streamMap: make(map[identity]*databasev1.Stream),
traceMap: make(map[identity]*databasev1.Trace),
traceIDIndexMap: make(map[identity]int),
}
@@ -83,27 +84,45 @@ func (ds *discoveryService) SetLogger(log *logger.Logger) {
ds.shardingKeyRepo.log = log
}
-func (ds *discoveryService) navigateByLocator(metadata *commonv1.Metadata,
tagFamilies []*modelv1.TagFamilyForWrite) (pbv1.EntityValues, common.ShardID,
error) {
+func (ds *discoveryService) navigateByLocator(metadata *commonv1.Metadata,
tagFamilies []*modelv1.TagFamilyForWrite,
+ specEntityLocator *specLocator, specShardingKeyLocator *specLocator,
+) (pbv1.EntityValues, common.ShardID, error) {
shardNum, existed := ds.groupRepo.shardNum(metadata.Group)
if !existed {
return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
}
id := getID(metadata)
- entityLocator, existed := ds.entityRepo.getLocator(id)
- if !existed {
- return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the entity locator by: %v", metadata)
- }
- entityValues, shardID, err := entityLocator.Locate(metadata.Name,
tagFamilies, shardNum)
- if err != nil {
- return nil, common.ShardID(0), err
- }
- shardingKeyLocator, existed := ds.shardingKeyRepo.getLocator(id)
- if !existed {
- return entityValues, shardID, nil
+ var entityValues pbv1.EntityValues
+ var shardID common.ShardID
+ var err error
+ if specEntityLocator != nil {
+ entityValues, shardID, err =
specEntityLocator.Locate(metadata.Name, tagFamilies, shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ } else {
+ entityLocator, existed := ds.entityRepo.getLocator(id)
+ if !existed {
+ return nil, common.ShardID(0),
errors.Wrapf(errNotExist, "finding the entity locator by: %v", metadata)
+ }
+ entityValues, shardID, err =
entityLocator.Locate(metadata.Name, tagFamilies, shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
}
- _, shardID, err = shardingKeyLocator.Locate(metadata.Name, tagFamilies,
shardNum)
- if err != nil {
- return nil, common.ShardID(0), err
+ if specShardingKeyLocator != nil {
+ _, shardID, err = specShardingKeyLocator.Locate(metadata.Name,
tagFamilies, shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ } else {
+ shardingKeyLocator, existed := ds.shardingKeyRepo.getLocator(id)
+ if existed {
+ _, shardID, err =
shardingKeyLocator.Locate(metadata.Name, tagFamilies, shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ }
}
return entityValues, shardID, nil
}
@@ -192,6 +211,7 @@ type entityRepo struct {
log *logger.Logger
entitiesMap map[identity]partition.Locator
measureMap map[identity]*databasev1.Measure
+ streamMap map[identity]*databasev1.Stream
traceMap map[identity]*databasev1.Trace
traceIDIndexMap map[identity]int // Cache trace ID tag index
sync.RWMutex
@@ -255,11 +275,14 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators,
ModRevision: modRevision}
- if schemaMetadata.Kind == schema.KindMeasure {
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
measure := schemaMetadata.Spec.(*databasev1.Measure)
e.measureMap[id] = measure
- } else {
- delete(e.measureMap, id) // Ensure measure is not stored for
streams
+ case schema.KindStream:
+ stream := schemaMetadata.Spec.(*databasev1.Stream)
+ e.streamMap[id] = stream
+ default:
}
}
@@ -301,6 +324,7 @@ func (e *entityRepo) OnDelete(schemaMetadata
schema.Metadata) {
defer e.RWMutex.Unlock()
delete(e.entitiesMap, id)
delete(e.measureMap, id) // Clean up measure
+ delete(e.streamMap, id) // Clean up stream
delete(e.traceMap, id) // Clean up trace
delete(e.traceIDIndexMap, id) // Clean up trace ID index
}
@@ -342,6 +366,13 @@ func (e *entityRepo) getMeasure(id identity)
(*databasev1.Measure, bool) {
return m, ok
}
+func (e *entityRepo) getStream(id identity) (*databasev1.Stream, bool) {
+ e.RWMutex.RLock()
+ defer e.RWMutex.RUnlock()
+ s, ok := e.streamMap[id]
+ return s, ok
+}
+
var _ schema.EventHandler = (*shardingKeyRepo)(nil)
type shardingKeyRepo struct {
diff --git a/banyand/liaison/grpc/locator.go b/banyand/liaison/grpc/locator.go
new file mode 100644
index 00000000..7b2d986c
--- /dev/null
+++ b/banyand/liaison/grpc/locator.go
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+type tagFamilySpec interface {
+ GetName() string
+ GetTagNames() []string
+}
+
+type specLocator struct {
+ tagLocators []partition.TagLocator
+}
+
+func newSpecLocator[T tagFamilySpec](schemaFamilies
[]*databasev1.TagFamilySpec, tagNames []string, specFamilies []T) *specLocator {
+ specFamilyMap, specTagMaps := buildSpecMaps(specFamilies)
+ locator := make([]partition.TagLocator, 0, len(tagNames))
+ for _, tagName := range tagNames {
+ familyIdx, tagIdx := findTagInSpec(schemaFamilies, tagName,
specFamilyMap, specTagMaps)
+ locator = append(locator, partition.TagLocator{FamilyOffset:
familyIdx, TagOffset: tagIdx})
+ }
+ return &specLocator{tagLocators: locator}
+}
+
+func buildSpecMaps[T tagFamilySpec](specFamilies []T) (map[string]int,
map[string]map[string]int) {
+ specFamilyMap := make(map[string]int, len(specFamilies))
+ specTagMaps := make(map[string]map[string]int, len(specFamilies))
+ for i, specFamily := range specFamilies {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int, len(specFamily.GetTagNames()))
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ return specFamilyMap, specTagMaps
+}
+
+func findTagInSpec(schemaFamilies []*databasev1.TagFamilySpec, tagName string,
+ specFamilyMap map[string]int, specTagMaps map[string]map[string]int,
+) (familyIdx, tagIdx int) {
+ for _, schemaFamily := range schemaFamilies {
+ for _, schemaTag := range schemaFamily.GetTags() {
+ if schemaTag.GetName() != tagName {
+ continue
+ }
+ fIdx, ok := specFamilyMap[schemaFamily.GetName()]
+ if !ok {
+ return -1, -1
+ }
+ tagMap := specTagMaps[schemaFamily.GetName()]
+ if tagMap == nil {
+ return -1, -1
+ }
+ tIdx, ok := tagMap[tagName]
+ if !ok {
+ return -1, -1
+ }
+ return fIdx, tIdx
+ }
+ }
+ return -1, -1
+}
+
+// Locate finds the entity values and shard ID using the spec locator.
+func (l *specLocator) Locate(subject string, tagFamilies
[]*modelv1.TagFamilyForWrite, shardNum uint32) (pbv1.EntityValues,
common.ShardID, error) {
+ entity, tagValues, err := l.Find(subject, tagFamilies)
+ if err != nil {
+ return nil, 0, err
+ }
+ id, err := partition.ShardID(entity.Marshal(), shardNum)
+ if err != nil {
+ return nil, 0, err
+ }
+ return tagValues, common.ShardID(id), nil
+}
+
+// Find finds the entity and entity values from tag families.
+func (l *specLocator) Find(subject string, tagFamilies
[]*modelv1.TagFamilyForWrite) (pbv1.Entity, pbv1.EntityValues, error) {
+ entityValues := make(pbv1.EntityValues, len(l.tagLocators)+1)
+ entityValues[0] = pbv1.EntityStrValue(subject)
+ for i, index := range l.tagLocators {
+ if index.FamilyOffset < 0 || index.TagOffset < 0 {
+ entityValues[i+1] = &modelv1.TagValue{Value:
&modelv1.TagValue_Null{}}
+ continue
+ }
+ tag, err := partition.GetTagByOffset(tagFamilies,
index.FamilyOffset, index.TagOffset)
+ if err != nil {
+ return nil, nil, err
+ }
+ entityValues[i+1] = tag
+ }
+ entity, err := entityValues.ToEntity()
+ if err != nil {
+ return nil, nil, err
+ }
+ return entity, entityValues, nil
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 001b6d02..125c8aea 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -29,7 +29,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -37,12 +36,23 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+type measureTagFamilySpec struct {
+ *measurev1.TagFamilySpec
+}
+
+func (m measureTagFamilySpec) GetName() string {
+ return m.TagFamilySpec.GetName()
+}
+
+func (m measureTagFamilySpec) GetTagNames() []string {
+ return m.TagFamilySpec.GetTagNames()
+}
+
type measureService struct {
measurev1.UnimplementedMeasureServiceServer
ingestionAccessLog accesslog.Log
@@ -87,6 +97,8 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
var metadata *commonv1.Metadata
var spec *measurev1.DataPointSpec
+ var specEntityLocator *specLocator
+ var specShardingKeyLocator *specLocator
isFirstRequest := true
nodeMetadataSent := make(map[string]bool)
nodeSpecSent := make(map[string]bool)
@@ -118,10 +130,10 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
return errors.New("metadata is required for the first
request of gRPC stream")
}
isFirstRequest = false
-
if writeRequest.GetDataPointSpec() != nil {
spec = writeRequest.GetDataPointSpec()
nodeSpecSent = make(map[string]bool)
+ specEntityLocator, specShardingKeyLocator =
ms.buildSpecLocators(metadata, spec)
}
ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"measure", "write")
@@ -130,12 +142,35 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
continue
}
- if err := ms.processAndPublishRequest(ctx, writeRequest,
metadata, spec, publisher, &succeedSent, measure, nodeMetadataSent,
nodeSpecSent); err != nil {
+ if err := ms.processAndPublishRequest(ctx, writeRequest,
metadata, spec,
+ specEntityLocator, specShardingKeyLocator, publisher,
&succeedSent, measure, nodeMetadataSent, nodeSpecSent); err != nil {
continue
}
}
}
+func (ms *measureService) buildSpecLocators(metadata *commonv1.Metadata, spec
*measurev1.DataPointSpec) (*specLocator, *specLocator) {
+ if spec == nil {
+ return nil, nil
+ }
+ id := getID(metadata)
+ measure, ok := ms.entityRepo.getMeasure(id)
+ if !ok {
+ return nil, nil
+ }
+ specFamilies := make([]tagFamilySpec, len(spec.GetTagFamilySpec()))
+ for i, f := range spec.GetTagFamilySpec() {
+ specFamilies[i] = measureTagFamilySpec{f}
+ }
+ entityLocator := newSpecLocator(measure.GetTagFamilies(),
measure.GetEntity().GetTagNames(), specFamilies)
+ var shardingKeyLocator *specLocator
+ shardingKey := measure.GetShardingKey()
+ if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
+ shardingKeyLocator = newSpecLocator(measure.GetTagFamilies(),
shardingKey.GetTagNames(), specFamilies)
+ }
+ return entityLocator, shardingKeyLocator
+}
+
func (ms *measureService) validateWriteRequest(writeRequest
*measurev1.WriteRequest,
metadata *commonv1.Metadata, measure
measurev1.MeasureService_WriteServer,
) modelv1.Status {
@@ -148,7 +183,7 @@ func (ms *measureService) validateWriteRequest(writeRequest
*measurev1.WriteRequ
if metadata.ModRevision > 0 {
measureCache, existed :=
ms.entityRepo.getLocator(getID(metadata))
if !existed {
- ms.l.Error().Stringer("written",
writeRequest).Msg("failed to measure schema not found")
+ ms.l.Error().Stringer("written",
writeRequest).Msg("measure schema not found")
ms.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeRequest.GetMessageId(), measure)
return modelv1.Status_STATUS_NOT_FOUND
}
@@ -163,8 +198,9 @@ func (ms *measureService) validateWriteRequest(writeRequest
*measurev1.WriteRequ
}
func (ms *measureService) processAndPublishRequest(ctx context.Context,
writeRequest *measurev1.WriteRequest,
- metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, publisher
queue.BatchPublisher,
- succeedSent *[]succeedSentMessage, measure
measurev1.MeasureService_WriteServer,
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec,
+ specEntityLocator *specLocator, specShardingKeyLocator *specLocator,
+ publisher queue.BatchPublisher, succeedSent *[]succeedSentMessage,
measure measurev1.MeasureService_WriteServer,
nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
) error {
// Retry with backoff when encountering errNotExist
@@ -176,7 +212,7 @@ func (ms *measureService) processAndPublishRequest(ctx
context.Context, writeReq
retryInterval := 10 * time.Millisecond
startTime := time.Now()
for {
- tagValues, shardID, err = ms.navigate(metadata,
writeRequest, spec)
+ tagValues, shardID, err = ms.navigate(metadata,
writeRequest, specEntityLocator, specShardingKeyLocator)
if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > ms.maxWaitDuration {
break
}
@@ -189,7 +225,7 @@ func (ms *measureService) processAndPublishRequest(ctx
context.Context, writeReq
}
}
} else {
- tagValues, shardID, err = ms.navigate(metadata, writeRequest,
spec)
+ tagValues, shardID, err = ms.navigate(metadata, writeRequest,
specEntityLocator, specShardingKeyLocator)
}
if err != nil {
@@ -266,134 +302,11 @@ func (ms *measureService) publishToNodes(ctx
context.Context, writeRequest *meas
return []string{nodeID}, nil
}
-func (ms *measureService) navigate(metadata *commonv1.Metadata,
- writeRequest *measurev1.WriteRequest, spec *measurev1.DataPointSpec,
+func (ms *measureService) navigate(metadata *commonv1.Metadata, writeRequest
*measurev1.WriteRequest,
+ specEntityLocator *specLocator, specShardingKeyLocator *specLocator,
) (pbv1.EntityValues, common.ShardID, error) {
tagFamilies := writeRequest.GetDataPoint().GetTagFamilies()
- if spec == nil {
- return ms.navigateByLocator(metadata, tagFamilies)
- }
- return ms.navigateByTagSpec(metadata, spec, tagFamilies)
-}
-
-func (ms *measureService) navigateByTagSpec(
- metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, tagFamilies
[]*modelv1.TagFamilyForWrite,
-) (pbv1.EntityValues, common.ShardID, error) {
- shardNum, existed := ms.groupRepo.shardNum(metadata.Group)
- if !existed {
- return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
- }
- id := getID(metadata)
- measure, ok := ms.entityRepo.getMeasure(id)
- if !ok {
- return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding measure schema by: %v", metadata)
- }
- specFamilyMap, specTagMaps := ms.buildSpecMaps(spec)
-
- entityValues := ms.findTagValuesByNames(
- metadata.Name,
- measure.GetTagFamilies(),
- tagFamilies,
- measure.GetEntity().GetTagNames(),
- specFamilyMap,
- specTagMaps,
- )
- entity, err := entityValues.ToEntity()
- if err != nil {
- return nil, common.ShardID(0), err
- }
-
- shardingKey := measure.GetShardingKey()
- if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
- shardingKeyValues := ms.findTagValuesByNames(
- metadata.Name,
- measure.GetTagFamilies(),
- tagFamilies,
- shardingKey.GetTagNames(),
- specFamilyMap,
- specTagMaps,
- )
- shardingEntity, shardingErr := shardingKeyValues.ToEntity()
- if shardingErr != nil {
- return nil, common.ShardID(0), shardingErr
- }
- shardID, shardingErr :=
partition.ShardID(shardingEntity.Marshal(), shardNum)
- if shardingErr != nil {
- return nil, common.ShardID(0), shardingErr
- }
- return entityValues, common.ShardID(shardID), nil
- }
-
- shardID, err := partition.ShardID(entity.Marshal(), shardNum)
- if err != nil {
- return nil, common.ShardID(0), err
- }
- return entityValues, common.ShardID(shardID), nil
-}
-
-func (ms *measureService) buildSpecMaps(spec *measurev1.DataPointSpec)
(map[string]int, map[string]map[string]int) {
- specFamilyMap := make(map[string]int)
- specTagMaps := make(map[string]map[string]int)
- for i, specFamily := range spec.GetTagFamilySpec() {
- specFamilyMap[specFamily.GetName()] = i
- tagMap := make(map[string]int)
- for j, tagName := range specFamily.GetTagNames() {
- tagMap[tagName] = j
- }
- specTagMaps[specFamily.GetName()] = tagMap
- }
- return specFamilyMap, specTagMaps
-}
-
-func (ms *measureService) findTagValuesByNames(
- subject string,
- schemaFamilies []*databasev1.TagFamilySpec,
- srcTagFamilies []*modelv1.TagFamilyForWrite,
- tagNames []string,
- specFamilyMap map[string]int,
- specTagMaps map[string]map[string]int,
-) pbv1.EntityValues {
- entityValues := make(pbv1.EntityValues, len(tagNames)+1)
- entityValues[0] = pbv1.EntityStrValue(subject)
- for i, tagName := range tagNames {
- tagValue := ms.findTagValueByName(schemaFamilies,
srcTagFamilies, tagName, specFamilyMap, specTagMaps)
- if tagValue == nil {
- entityValues[i+1] = &modelv1.TagValue{Value:
&modelv1.TagValue_Null{}}
- } else {
- entityValues[i+1] = tagValue
- }
- }
- return entityValues
-}
-
-func (ms *measureService) findTagValueByName(
- schemaFamilies []*databasev1.TagFamilySpec,
- srcTagFamilies []*modelv1.TagFamilyForWrite,
- tagName string,
- specFamilyMap map[string]int,
- specTagMaps map[string]map[string]int,
-) *modelv1.TagValue {
- for _, schemaFamily := range schemaFamilies {
- for _, schemaTag := range schemaFamily.GetTags() {
- if schemaTag.GetName() != tagName {
- continue
- }
- familyIdx, ok := specFamilyMap[schemaFamily.GetName()]
- if !ok || familyIdx >= len(srcTagFamilies) {
- return nil
- }
- tagMap := specTagMaps[schemaFamily.GetName()]
- if tagMap == nil {
- return nil
- }
- tagIdx, ok := tagMap[tagName]
- if !ok || tagIdx >=
len(srcTagFamilies[familyIdx].GetTags()) {
- return nil
- }
- return srcTagFamilies[familyIdx].GetTags()[tagIdx]
- }
- }
- return nil
+ return ms.navigateByLocator(metadata, tagFamilies, specEntityLocator,
specShardingKeyLocator)
}
func (ms *measureService) sendReply(metadata *commonv1.Metadata, status
modelv1.Status, messageID uint64, measure measurev1.MeasureService_WriteServer)
{
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 81cd420e..af570cfa 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -41,6 +41,18 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+type streamTagFamilySpec struct {
+ *streamv1.TagFamilySpec
+}
+
+func (s streamTagFamilySpec) GetName() string {
+ return s.TagFamilySpec.GetName()
+}
+
+func (s streamTagFamilySpec) GetTagNames() []string {
+ return s.TagFamilySpec.GetTagNames()
+}
+
type streamService struct {
streamv1.UnimplementedStreamServiceServer
ingestionAccessLog accesslog.Log
@@ -74,33 +86,63 @@ func (s *streamService) activeQueryAccessLog(root string,
sampled bool) (err err
return nil
}
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest)
error {
- if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err !=
nil {
- s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the
element time is invalid")
- return err
+func (s *streamService) validateWriteRequest(writeEntity
*streamv1.WriteRequest,
+ metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+ if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp);
errTime != nil {
+ s.l.Error().Err(errTime).Stringer("written",
writeEntity).Msg("the element time is invalid")
+ s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_INVALID_TIMESTAMP
}
- return nil
-}
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest)
error {
- if writeEntity.Metadata.ModRevision > 0 {
- streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if metadata.ModRevision > 0 {
+ streamCache, existed := s.entityRepo.getLocator(getID(metadata))
if !existed {
- return errors.New("stream schema not found")
+ s.l.Error().Stringer("written",
writeEntity).Msg("stream schema not found")
+ s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_NOT_FOUND
}
- if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
- return errors.New("expired stream schema")
+ if metadata.ModRevision != streamCache.ModRevision {
+ s.l.Error().Stringer("written", writeEntity).Msg("the
stream schema is expired")
+ s.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_EXPIRED_SCHEMA
}
}
- return nil
+
+ return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata, writeRequest
*streamv1.WriteRequest,
+ specLocator *specLocator,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetElement().GetTagFamilies()
+ return s.navigateByLocator(metadata, tagFamilies, specLocator, nil)
}
-func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest)
(tagValues pbv1.EntityValues, shardID common.ShardID, err error) {
+func (s *streamService) buildSpecLocator(metadata *commonv1.Metadata, spec
[]*streamv1.TagFamilySpec) *specLocator {
+ if spec == nil {
+ return nil
+ }
+ id := getID(metadata)
+ stream, ok := s.entityRepo.getStream(id)
+ if !ok {
+ return nil
+ }
+ specFamilies := make([]tagFamilySpec, len(spec))
+ for i, f := range spec {
+ specFamilies[i] = streamTagFamilySpec{f}
+ }
+ return newSpecLocator(stream.GetTagFamilies(),
stream.GetEntity().GetTagNames(), specFamilies)
+}
+
+func (s *streamService) navigateWithRetry(writeEntity *streamv1.WriteRequest,
metadata *commonv1.Metadata,
+ specLocator *specLocator,
+) (tagValues pbv1.EntityValues, shardID common.ShardID, err error) {
if s.maxWaitDuration > 0 {
retryInterval := 10 * time.Millisecond
startTime := time.Now()
for {
- tagValues, shardID, err =
s.navigateByLocator(writeEntity.GetMetadata(),
writeEntity.GetElement().GetTagFamilies())
+ tagValues, shardID, err = s.navigate(metadata,
writeEntity, specLocator)
if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > s.maxWaitDuration {
return
}
@@ -111,26 +153,39 @@ func (s *streamService) navigateWithRetry(writeEntity
*streamv1.WriteRequest) (t
}
}
}
- return s.navigateByLocator(writeEntity.GetMetadata(),
writeEntity.GetElement().GetTagFamilies())
+ return s.navigate(metadata, writeEntity, specLocator)
}
func (s *streamService) publishMessages(
ctx context.Context,
publisher queue.BatchPublisher,
writeEntity *streamv1.WriteRequest,
+ metadata *commonv1.Metadata,
+ spec []*streamv1.TagFamilySpec,
shardID common.ShardID,
tagValues pbv1.EntityValues,
+ nodeMetadataSent map[string]bool,
+ nodeSpecSent map[string]bool,
) ([]string, error) {
iwr := &streamv1.InternalWriteRequest{
Request: writeEntity,
ShardId: uint32(shardID),
EntityValues: tagValues[1:].Encode(),
}
- nodeID, err :=
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(),
writeEntity.GetMetadata().GetName(), uint32(shardID), 0)
+ nodeID, err := s.nodeRegistry.Locate(metadata.GetGroup(),
metadata.GetName(), uint32(shardID), 0)
if err != nil {
return nil, err
}
+ if !nodeMetadataSent[nodeID] {
+ iwr.Request.Metadata = metadata
+ nodeMetadataSent[nodeID] = true
+ }
+ if spec != nil && !nodeSpecSent[nodeID] {
+ iwr.Request.TagFamilySpec = spec
+ nodeSpecSent[nodeID] = true
+ }
+
message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
if _, err := publisher.Publish(ctx, data.TopicStreamWrite, message);
err != nil {
return nil, err
@@ -138,20 +193,24 @@ func (s *streamService) publishMessages(
return []string{nodeID}, nil
}
-func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error
{
- reply := func(metadata *commonv1.Metadata, status modelv1.Status,
messageId uint64, stream streamv1.StreamService_WriteServer, logger
*logger.Logger) {
- if status != modelv1.Status_STATUS_SUCCEED {
- s.metrics.totalStreamMsgReceivedErr.Inc(1,
metadata.Group, "stream", "write")
- }
- s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "stream",
"write")
- if errResp := stream.Send(&streamv1.WriteResponse{Metadata:
metadata, Status: status.String(), MessageId: messageId}); errResp != nil {
- if dl := logger.Debug(); dl.Enabled() {
- dl.Err(errResp).Msg("failed to send stream
write response")
- }
- s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"stream", "write")
+func (s *streamService) sendReply(metadata *commonv1.Metadata, status
modelv1.Status, messageID uint64, stream streamv1.StreamService_WriteServer) {
+ if metadata == nil {
+ s.l.Error().Stringer("status", status).Msg("metadata is nil,
cannot send reply")
+ return
+ }
+ if status != modelv1.Status_STATUS_SUCCEED {
+ s.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group,
"stream", "write")
+ }
+ s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "stream", "write")
+ if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata,
Status: status.String(), MessageId: messageID}); errResp != nil {
+ if dl := s.l.Debug(); dl.Enabled() {
+ dl.Err(errResp).Msg("failed to send stream write
response")
}
+ s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"stream", "write")
}
+}
+func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error
{
s.metrics.totalStreamStarted.Inc(1, "stream", "write")
publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
start := time.Now()
@@ -169,7 +228,7 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
}
}
}
- reply(ssm.metadata, code, ssm.messageID, stream, s.l)
+ s.sendReply(ssm.metadata, code, ssm.messageID, stream)
}
if err != nil {
s.l.Error().Err(err).Msg("failed to close the
publisher")
@@ -182,6 +241,14 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
}()
ctx := stream.Context()
+
+ var metadata *commonv1.Metadata
+ var spec []*streamv1.TagFamilySpec
+ var specLocator *specLocator
+ isFirstRequest := true
+ nodeMetadataSent := make(map[string]bool)
+ nodeSpecSent := make(map[string]bool)
+
for {
select {
case <-ctx.Done():
@@ -200,30 +267,32 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
return err
}
- requestCount++
- s.metrics.totalStreamMsgReceived.Inc(1,
writeEntity.Metadata.Group, "stream", "write")
-
- if err = s.validateTimestamp(writeEntity); err != nil {
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream,
s.l)
- continue
+ if writeEntity.GetMetadata() != nil {
+ metadata = writeEntity.GetMetadata()
+ nodeMetadataSent = make(map[string]bool)
+ } else if isFirstRequest {
+ s.l.Error().Msg("metadata is required for the first
request of gRPC stream")
+ s.sendReply(nil,
modelv1.Status_STATUS_METADATA_REQUIRED, writeEntity.GetMessageId(), stream)
+ return errors.New("metadata is required for the first
request of gRPC stream")
+ }
+ isFirstRequest = false
+ if writeEntity.GetTagFamilySpec() != nil {
+ spec = writeEntity.GetTagFamilySpec()
+ nodeSpecSent = make(map[string]bool)
+ specLocator = s.buildSpecLocator(metadata, spec)
}
- if err = s.validateMetadata(writeEntity); err != nil {
- status := modelv1.Status_STATUS_INTERNAL_ERROR
- if errors.Is(err, errors.New("stream schema not
found")) {
- status = modelv1.Status_STATUS_NOT_FOUND
- } else if errors.Is(err, errors.New("expired stream
schema")) {
- status = modelv1.Status_STATUS_EXPIRED_SCHEMA
- }
- s.l.Error().Err(err).Stringer("written",
writeEntity).Msg("metadata validation failed")
- reply(writeEntity.GetMetadata(), status,
writeEntity.GetMessageId(), stream, s.l)
+ requestCount++
+ s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"stream", "write")
+
+ if s.validateWriteRequest(writeEntity, metadata, stream) !=
modelv1.Status_STATUS_SUCCEED {
continue
}
- tagValues, shardID, err := s.navigateWithRetry(writeEntity)
+ tagValues, shardID, err := s.navigateWithRetry(writeEntity,
metadata, specLocator)
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("navigation failed")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
+ s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
continue
}
@@ -233,15 +302,15 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
}
}
- nodes, err := s.publishMessages(ctx, publisher, writeEntity,
shardID, tagValues)
+ nodes, err := s.publishMessages(ctx, publisher, writeEntity,
metadata, spec, shardID, tagValues, nodeMetadataSent, nodeSpecSent)
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("publishing failed")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.l)
+ s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream)
continue
}
succeedSent = append(succeedSent, succeedSentMessage{
- metadata: writeEntity.GetMetadata(),
+ metadata: metadata,
messageID: writeEntity.GetMessageId(),
nodes: nodes,
})
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 89125a21..083927c8 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -19,7 +19,6 @@ package grpc
import (
"context"
- "hash/fnv"
"io"
"time"
@@ -41,6 +40,30 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+type traceSpecLocator struct {
+ traceIDIndex int
+ timestampIndex int
+}
+
+func newTraceSpecLocator(spec *tracev1.TagSpec, traceIDTagName,
timestampTagName string) *traceSpecLocator {
+ locator := &traceSpecLocator{
+ traceIDIndex: -1,
+ timestampIndex: -1,
+ }
+ if spec == nil {
+ return locator
+ }
+ for i, tagName := range spec.GetTagNames() {
+ if tagName == traceIDTagName {
+ locator.traceIDIndex = i
+ }
+ if tagName == timestampTagName {
+ locator.timestampIndex = i
+ }
+ }
+ return locator
+}
+
type traceService struct {
tracev1.UnimplementedTraceServiceServer
ingestionAccessLog accesslog.Log
@@ -74,51 +97,115 @@ func (s *traceService) activeQueryAccessLog(root string,
sampled bool) (err erro
return nil
}
-func (s *traceService) validateTimestamp(writeEntity *tracev1.WriteRequest)
error {
- // Get trace schema from entityRepo
- id := getID(writeEntity.GetMetadata())
+func (s *traceService) validateWriteRequest(writeEntity *tracev1.WriteRequest,
+ metadata *commonv1.Metadata, specLocator *traceSpecLocator, stream
tracev1.TraceService_WriteServer,
+) modelv1.Status {
+ id := getID(metadata)
traceEntity, existed := s.entityRepo.getTrace(id)
if !existed {
- return errors.New("trace schema not found")
+ s.l.Error().Stringer("written", writeEntity).Msg("trace schema
not found")
+ s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeEntity.GetVersion(), stream)
+ return modelv1.Status_STATUS_NOT_FOUND
}
- timestampTagName := traceEntity.GetTimestampTagName()
- for _, tag := range writeEntity.GetTags() {
- if tag.GetTimestamp() != nil {
- if err := timestamp.CheckPb(tag.GetTimestamp()); err !=
nil {
- s.l.Error().Stringer("written",
writeEntity).Err(err).Msg("the timestamp is invalid")
- return err
+ var foundTimestamp bool
+ if specLocator != nil && specLocator.timestampIndex >= 0 {
+ tags := writeEntity.GetTags()
+ if specLocator.timestampIndex < len(tags) {
+ tagValue := tags[specLocator.timestampIndex]
+ if tagValue != nil && tagValue.GetTimestamp() != nil {
+ if errTime :=
timestamp.CheckPb(tagValue.GetTimestamp()); errTime != nil {
+
s.l.Error().Err(errTime).Stringer("written", writeEntity).Msg("the timestamp is
invalid")
+ s.sendReply(metadata,
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream)
+ return
modelv1.Status_STATUS_INVALID_TIMESTAMP
+ }
+ foundTimestamp = true
}
- return nil
}
}
+ if !foundTimestamp {
+ for _, tag := range writeEntity.GetTags() {
+ if tag.GetTimestamp() != nil {
+ if errTime :=
timestamp.CheckPb(tag.GetTimestamp()); errTime != nil {
+
s.l.Error().Err(errTime).Stringer("written", writeEntity).Msg("the timestamp is
invalid")
+ s.sendReply(metadata,
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream)
+ return
modelv1.Status_STATUS_INVALID_TIMESTAMP
+ }
+ foundTimestamp = true
+ break
+ }
+ }
+ }
+ if !foundTimestamp {
+ timestampTagName := traceEntity.GetTimestampTagName()
+ s.l.Error().Stringer("written", writeEntity).Msg("timestamp tag
not found: " + timestampTagName)
+ s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetVersion(), stream)
+ return modelv1.Status_STATUS_INVALID_TIMESTAMP
+ }
- return errors.New("timestamp tag not found: " + timestampTagName)
+ if metadata.ModRevision > 0 {
+ if metadata.ModRevision !=
traceEntity.GetMetadata().GetModRevision() {
+ s.l.Error().Stringer("written", writeEntity).Msg("the
trace schema is expired")
+ s.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetVersion(), stream)
+ return modelv1.Status_STATUS_EXPIRED_SCHEMA
+ }
+ }
+
+ return modelv1.Status_STATUS_SUCCEED
}
-func (s *traceService) validateMetadata(writeEntity *tracev1.WriteRequest)
error {
- if writeEntity.Metadata.ModRevision > 0 {
- traceCache, existed :=
s.entityRepo.getTrace(getID(writeEntity.GetMetadata()))
- if !existed {
- return errors.New("trace schema not found")
- }
- if writeEntity.Metadata.ModRevision !=
traceCache.GetMetadata().GetModRevision() {
- return errors.New("expired trace schema")
+func (s *traceService) navigate(metadata *commonv1.Metadata,
+ writeRequest *tracev1.WriteRequest, specLocator *traceSpecLocator,
+) (common.ShardID, error) {
+ shardCount, existed := s.groupRepo.shardNum(metadata.GetGroup())
+ if !existed {
+ return 0, errors.Wrapf(errNotExist, "finding the shard num by:
%v", metadata)
+ }
+ id := getID(metadata)
+ traceEntity, existed := s.entityRepo.getTrace(id)
+ if !existed {
+ return 0, errors.Wrapf(errNotExist, "finding trace schema by:
%v", metadata)
+ }
+
+ var traceID string
+ var err error
+ if specLocator != nil && specLocator.traceIDIndex >= 0 {
+ tags := writeRequest.GetTags()
+ if specLocator.traceIDIndex < len(tags) {
+ traceID, err =
extractTraceIDFromTagValue(tags[specLocator.traceIDIndex])
+ if err == nil {
+ return s.shardID(traceID, shardCount), nil
+ }
}
}
- return nil
+
+ traceIDIndex, existed := s.entityRepo.getTraceIDIndex(id)
+ if !existed || traceIDIndex == -1 {
+ return 0, errors.New("trace ID tag not found in schema: " +
traceEntity.GetTraceIdTagName())
+ }
+ traceID, err = s.extractTraceID(writeRequest.GetTags(), traceIDIndex)
+ if err != nil {
+ return 0, err
+ }
+ return s.shardID(traceID, shardCount), nil
+}
+
+func (s *traceService) shardID(traceID string, shardCount uint32)
common.ShardID {
+ hash := convert.Hash([]byte(traceID))
+ return common.ShardID(hash % uint64(shardCount))
}
func (s *traceService) extractTraceID(tags []*modelv1.TagValue, traceIDIndex
int) (string, error) {
if len(tags) == 0 {
return "", errors.New("no tags found")
}
-
if traceIDIndex < 0 || traceIDIndex >= len(tags) {
return "", errors.New("trace ID tag index out of range")
}
+ return extractTraceIDFromTagValue(tags[traceIDIndex])
+}
- tag := tags[traceIDIndex]
+func extractTraceIDFromTagValue(tag *modelv1.TagValue) (string, error) {
switch v := tag.GetValue().(type) {
case *modelv1.TagValue_Str:
return v.Str.GetValue(), nil
@@ -129,45 +216,16 @@ func (s *traceService) extractTraceID(tags
[]*modelv1.TagValue, traceIDIndex int
}
}
-func (s *traceService) getTraceShardID(writeEntity *tracev1.WriteRequest)
(common.ShardID, error) {
- // Get shard count from group configuration
- shardCount, existed :=
s.groupRepo.shardNum(writeEntity.GetMetadata().GetGroup())
- if !existed {
- return 0, errors.New("group not found or no shard
configuration")
- }
-
- // Get cached trace ID index from entityRepo
- id := getID(writeEntity.GetMetadata())
- traceIDIndex, existed := s.entityRepo.getTraceIDIndex(id)
- if !existed {
- return 0, errors.New("trace schema not found")
- }
-
- if traceIDIndex == -1 {
- return 0, errors.New("trace ID tag not found in schema")
- }
-
- traceID, err := s.extractTraceID(writeEntity.GetTags(), traceIDIndex)
- if err != nil {
- return 0, err
- }
-
- // Calculate shard ID using hash of trace ID
- hasher := fnv.New32a()
- hasher.Write([]byte(traceID))
- hash := hasher.Sum32()
-
- return common.ShardID(hash % shardCount), nil
-}
-
-func (s *traceService) getTraceShardIDWithRetry(writeEntity
*tracev1.WriteRequest) (common.ShardID, error) {
+func (s *traceService) navigateWithRetry(writeEntity *tracev1.WriteRequest,
metadata *commonv1.Metadata,
+ specLocator *traceSpecLocator,
+) (shardID common.ShardID, err error) {
if s.maxWaitDuration > 0 {
retryInterval := 10 * time.Millisecond
startTime := time.Now()
for {
- shardID, err := s.getTraceShardID(writeEntity)
+ shardID, err = s.navigate(metadata, writeEntity,
specLocator)
if err == nil || !errors.Is(err, errNotExist) ||
time.Since(startTime) > s.maxWaitDuration {
- return shardID, err
+ return
}
time.Sleep(retryInterval)
retryInterval = time.Duration(float64(retryInterval) *
1.5)
@@ -176,24 +234,42 @@ func (s *traceService)
getTraceShardIDWithRetry(writeEntity *tracev1.WriteReques
}
}
}
- return s.getTraceShardID(writeEntity)
+ return s.navigate(metadata, writeEntity, specLocator)
}
func (s *traceService) publishMessages(
ctx context.Context,
publisher queue.BatchPublisher,
writeEntity *tracev1.WriteRequest,
+ metadata *commonv1.Metadata,
+ spec *tracev1.TagSpec,
shardID common.ShardID,
+ nodeMetadataSent map[string]bool,
+ nodeSpecSent map[string]bool,
) ([]string, error) {
- iwr := &tracev1.InternalWriteRequest{
- ShardId: uint32(shardID),
- Request: writeEntity,
- }
- nodeID, err :=
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(),
writeEntity.GetMetadata().GetName(), uint32(shardID), 0)
+ nodeID, err := s.nodeRegistry.Locate(metadata.GetGroup(),
metadata.GetName(), uint32(shardID), 0)
if err != nil {
return nil, err
}
+ requestToSend := &tracev1.WriteRequest{
+ Version: writeEntity.GetVersion(),
+ Tags: writeEntity.GetTags(),
+ Span: writeEntity.GetSpan(),
+ }
+ if !nodeMetadataSent[nodeID] {
+ requestToSend.Metadata = metadata
+ nodeMetadataSent[nodeID] = true
+ }
+ if spec != nil && !nodeSpecSent[nodeID] {
+ requestToSend.TagSpec = spec
+ nodeSpecSent[nodeID] = true
+ }
+ iwr := &tracev1.InternalWriteRequest{
+ ShardId: uint32(shardID),
+ Request: requestToSend,
+ }
+
message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
if _, err := publisher.Publish(ctx, data.TopicTraceWrite, message); err
!= nil {
return nil, err
@@ -201,20 +277,24 @@ func (s *traceService) publishMessages(
return []string{nodeID}, nil
}
-func (s *traceService) Write(stream tracev1.TraceService_WriteServer) error {
- reply := func(metadata *commonv1.Metadata, status modelv1.Status,
version uint64, stream tracev1.TraceService_WriteServer, logger *logger.Logger)
{
- if status != modelv1.Status_STATUS_SUCCEED {
- s.metrics.totalStreamMsgReceivedErr.Inc(1,
metadata.Group, "trace", "write")
- }
- s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "trace",
"write")
- if errResp := stream.Send(&tracev1.WriteResponse{Metadata:
metadata, Status: status.String(), Version: version}); errResp != nil {
- if dl := logger.Debug(); dl.Enabled() {
- dl.Err(errResp).Msg("failed to send trace write
response")
- }
- s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"trace", "write")
+func (s *traceService) sendReply(metadata *commonv1.Metadata, status
modelv1.Status, version uint64, stream tracev1.TraceService_WriteServer) {
+ if metadata == nil {
+ s.l.Error().Stringer("status", status).Msg("metadata is nil,
cannot send reply")
+ return
+ }
+ if status != modelv1.Status_STATUS_SUCCEED {
+ s.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group,
"trace", "write")
+ }
+ s.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "trace", "write")
+ if errResp := stream.Send(&tracev1.WriteResponse{Metadata: metadata,
Status: status.String(), Version: version}); errResp != nil {
+ if dl := s.l.Debug(); dl.Enabled() {
+ dl.Err(errResp).Msg("failed to send trace write
response")
}
+ s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "trace",
"write")
}
+}
+func (s *traceService) Write(stream tracev1.TraceService_WriteServer) error {
s.metrics.totalStreamStarted.Inc(1, "trace", "write")
publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
start := time.Now()
@@ -232,7 +312,7 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
}
}
}
- reply(ssm.metadata, code, ssm.messageID, stream, s.l)
+ s.sendReply(ssm.metadata, code, ssm.messageID, stream)
}
if err != nil {
s.l.Error().Err(err).Msg("failed to close the
publisher")
@@ -245,6 +325,14 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
}()
ctx := stream.Context()
+
+ var metadata *commonv1.Metadata
+ var spec *tracev1.TagSpec
+ var specLocator *traceSpecLocator
+ isFirstRequest := true
+ nodeMetadataSent := make(map[string]bool)
+ nodeSpecSent := make(map[string]bool)
+
for {
select {
case <-ctx.Done():
@@ -263,30 +351,37 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
return err
}
- requestCount++
- s.metrics.totalStreamMsgReceived.Inc(1,
writeEntity.Metadata.Group, "trace", "write")
-
- if err = s.validateTimestamp(writeEntity); err != nil {
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetVersion(), stream, s.l)
- continue
+ if writeEntity.GetMetadata() != nil {
+ metadata = writeEntity.GetMetadata()
+ nodeMetadataSent = make(map[string]bool)
+ specLocator = nil
+ } else if isFirstRequest {
+ s.l.Error().Msg("metadata is required for the first
request of gRPC stream")
+ s.sendReply(nil,
modelv1.Status_STATUS_METADATA_REQUIRED, writeEntity.GetVersion(), stream)
+ return errors.New("metadata is required for the first
request of gRPC stream")
}
-
- if err = s.validateMetadata(writeEntity); err != nil {
- status := modelv1.Status_STATUS_INTERNAL_ERROR
- if errors.Is(err, errors.New("trace schema not found"))
{
- status = modelv1.Status_STATUS_NOT_FOUND
- } else if errors.Is(err, errors.New("expired trace
schema")) {
- status = modelv1.Status_STATUS_EXPIRED_SCHEMA
+ isFirstRequest = false
+ if writeEntity.GetTagSpec() != nil {
+ spec = writeEntity.GetTagSpec()
+ nodeSpecSent = make(map[string]bool)
+ id := getID(metadata)
+ traceEntity, existed := s.entityRepo.getTrace(id)
+ if existed {
+ specLocator = newTraceSpecLocator(spec,
traceEntity.GetTraceIdTagName(), traceEntity.GetTimestampTagName())
}
- s.l.Error().Err(err).Stringer("written",
writeEntity).Msg("metadata validation failed")
- reply(writeEntity.GetMetadata(), status,
writeEntity.GetVersion(), stream, s.l)
+ }
+
+ requestCount++
+ s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"trace", "write")
+
+ if s.validateWriteRequest(writeEntity, metadata, specLocator,
stream) != modelv1.Status_STATUS_SUCCEED {
continue
}
- shardID, err := s.getTraceShardIDWithRetry(writeEntity)
+ shardID, err := s.navigateWithRetry(writeEntity, metadata,
specLocator)
if err != nil {
- s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("trace sharding failed")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l)
+ s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("navigation failed")
+ s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
continue
}
@@ -296,15 +391,15 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
}
}
- nodes, err := s.publishMessages(ctx, publisher, writeEntity,
shardID)
+ nodes, err := s.publishMessages(ctx, publisher, writeEntity,
metadata, spec, shardID, nodeMetadataSent, nodeSpecSent)
if err != nil {
s.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("publishing failed")
- reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream, s.l)
+ s.sendReply(metadata,
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetVersion(), stream)
continue
}
succeedSent = append(succeedSent, succeedSentMessage{
- metadata: writeEntity.GetMetadata(),
+ metadata: metadata,
messageID: writeEntity.GetVersion(),
nodes: nodes,
})
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index a9247c6a..d5283e53 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -24,6 +24,7 @@ import (
"fmt"
"math/big"
"path"
+ "strings"
"sync"
"time"
@@ -270,7 +271,12 @@ func (e *etcdSchemaRegistry) prependNamespace(key string)
string {
if e.namespace == "" {
return key
}
- return path.Join("/", e.namespace, key)
+ hasTrailingSlash := strings.HasSuffix(key, "/")
+ result := path.Join("/", e.namespace, key)
+ if hasTrailingSlash && !strings.HasSuffix(result, "/") {
+ result += "/"
+ }
+ return result
}
func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message
proto.Message) error {
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index be566f09..b86f48d1 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -69,13 +70,15 @@ func (w *writeQueueCallback) CheckHealth() *common.Error {
return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk
usage is too high, stop writing")
}
-func (w *writeQueueCallback) handle(dst map[string]*elementsInQueue,
writeEvent *streamv1.InternalWriteRequest) (map[string]*elementsInQueue, error)
{
+func (w *writeQueueCallback) handle(dst map[string]*elementsInQueue,
writeEvent *streamv1.InternalWriteRequest,
+ metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
+) (map[string]*elementsInQueue, error) {
t := writeEvent.Request.Element.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
- eq, err := w.prepareElementsInQueue(dst, writeEvent)
+ eq, err := w.prepareElementsInQueue(dst, metadata)
if err != nil {
return nil, err
}
@@ -83,15 +86,15 @@ func (w *writeQueueCallback) handle(dst
map[string]*elementsInQueue, writeEvent
if err != nil {
return nil, err
}
- err = processElements(w.schemaRepo, et.elements, writeEvent, ts,
&et.docs, &et.seriesDocs)
+ err = processElements(w.schemaRepo, et.elements, writeEvent, ts,
&et.docs, &et.seriesDocs, metadata, spec)
if err != nil {
return nil, err
}
return dst, nil
}
-func (w *writeQueueCallback) prepareElementsInQueue(dst
map[string]*elementsInQueue, writeEvent *streamv1.InternalWriteRequest)
(*elementsInQueue, error) {
- gn := writeEvent.Request.Metadata.Group
+func (w *writeQueueCallback) prepareElementsInQueue(dst
map[string]*elementsInQueue, metadata *commonv1.Metadata) (*elementsInQueue,
error) {
+ gn := metadata.Group
queue, err := w.schemaRepo.loadQueue(gn)
if err != nil {
return nil, fmt.Errorf("cannot load queue for group %s: %w",
gn, err)
@@ -155,6 +158,8 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
return
}
groups := make(map[string]*elementsInQueue)
+ var metadata *commonv1.Metadata
+ var spec []*streamv1.TagFamilySpec
for i := range events {
var writeEvent *streamv1.InternalWriteRequest
switch e := events[i].(type) {
@@ -170,8 +175,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
w.l.Warn().Msg("invalid event data type")
continue
}
+ req := writeEvent.Request
+ if req != nil && req.GetMetadata() != nil {
+ metadata = req.GetMetadata()
+ }
+ if req != nil && req.GetTagFamilySpec() != nil {
+ spec = req.GetTagFamilySpec()
+ }
var err error
- if groups, err = w.handle(groups, writeEvent); err != nil {
+ if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
w.l.Error().Err(err).Msg("cannot handle write event")
groups = make(map[string]*elementsInQueue)
continue
diff --git a/banyand/stream/write_standalone.go
b/banyand/stream/write_standalone.go
index b2b69771..c266cffa 100644
--- a/banyand/stream/write_standalone.go
+++ b/banyand/stream/write_standalone.go
@@ -26,6 +26,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -69,13 +70,14 @@ func (w *writeCallback) CheckHealth() *common.Error {
}
func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent
*streamv1.InternalWriteRequest,
+ metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
) (map[string]*elementsInGroup, error) {
t := writeEvent.Request.Element.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
- eg, err := w.prepareElementsInGroup(dst, writeEvent, ts)
+ eg, err := w.prepareElementsInGroup(dst, metadata, ts)
if err != nil {
return nil, err
}
@@ -83,15 +85,15 @@ func (w *writeCallback) handle(dst
map[string]*elementsInGroup, writeEvent *stre
if err != nil {
return nil, err
}
- err = processElements(w.schemaRepo, et.elements, writeEvent, ts,
&et.docs, &et.seriesDocs)
+ err = processElements(w.schemaRepo, et.elements, writeEvent, ts,
&et.docs, &et.seriesDocs, metadata, spec)
if err != nil {
return nil, err
}
return dst, nil
}
-func (w *writeCallback) prepareElementsInGroup(dst
map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest, ts
int64) (*elementsInGroup, error) {
- gn := writeEvent.Request.Metadata.Group
+func (w *writeCallback) prepareElementsInGroup(dst
map[string]*elementsInGroup, metadata *commonv1.Metadata, ts int64)
(*elementsInGroup, error) {
+ gn := metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn,
err)
@@ -161,17 +163,17 @@ func (w *writeCallback) prepareElementsInTable(eg
*elementsInGroup, writeEvent *
}
func processElements(schemaRepo *schemaRepo, elements *elements, writeEvent
*streamv1.InternalWriteRequest,
- ts int64, tableDocs *index.Documents, seriesDocs *seriesDoc,
+ ts int64, tableDocs *index.Documents, seriesDocs *seriesDoc, metadata
*commonv1.Metadata, spec []*streamv1.TagFamilySpec,
) error {
req := writeEvent.Request
elements.timestamps = append(elements.timestamps, ts)
- eID := convert.HashStr(req.Metadata.Group + "|" + req.Metadata.Name +
"|" + req.Element.ElementId)
+ eID := convert.HashStr(metadata.Group + "|" + metadata.Name + "|" +
req.Element.ElementId)
elements.elementIDs = append(elements.elementIDs, eID)
- stm, ok := schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
+ stm, ok := schemaRepo.loadStream(metadata)
if !ok {
- return fmt.Errorf("cannot find stream definition: %s",
writeEvent.GetRequest().GetMetadata())
+ return fmt.Errorf("cannot find stream definition: %s", metadata)
}
fLen := len(req.Element.GetTagFamilies())
@@ -179,11 +181,11 @@ func processElements(schemaRepo *schemaRepo, elements
*elements, writeEvent *str
return fmt.Errorf("%s has no tag family", req)
}
if fLen > len(stm.schema.GetTagFamilies()) {
- return fmt.Errorf("%s has more tag families than %s",
req.Metadata, stm.schema)
+ return fmt.Errorf("%s has more tag families than %s", metadata,
stm.schema)
}
series := &pbv1.Series{
- Subject: req.Metadata.Name,
+ Subject: metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
@@ -200,28 +202,37 @@ func processElements(schemaRepo *schemaRepo, elements
*elements, writeEvent *str
len(is.indexRuleLocators.TagFamilyTRule),
len(stm.GetSchema().GetTagFamilies()))
}
+ specFamilyMap, specTagMaps := buildSpecMaps(spec)
+
for i := range stm.GetSchema().GetTagFamilies() {
- var tagFamily *modelv1.TagFamilyForWrite
- if len(req.Element.TagFamilies) <= i {
- tagFamily = pbv1.NullTagFamily
- } else {
- tagFamily = req.Element.TagFamilies[i]
- }
- tfr := is.indexRuleLocators.TagFamilyTRule[i]
tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
+ srcFamily, specTagMap := getSrcFamilyAndTagMap(spec,
tagFamilySpec, req, i, specFamilyMap, specTagMaps)
+ tfr := is.indexRuleLocators.TagFamilyTRule[i]
tf := tagValues{
tag: tagFamilySpec.Name,
}
for j := range tagFamilySpec.Tags {
+ t := tagFamilySpec.Tags[j]
+
var tagValue *modelv1.TagValue
- if tagFamily == pbv1.NullTagFamily ||
len(tagFamily.Tags) <= j {
- tagValue = pbv1.NullTagValue
+ if spec != nil {
+ if srcFamily != nil && specTagMap != nil {
+ if srcTagIdx, ok := specTagMap[t.Name];
ok && srcTagIdx < len(srcFamily.Tags) {
+ tagValue =
srcFamily.Tags[srcTagIdx]
+ }
+ }
+ if tagValue == nil {
+ tagValue = pbv1.NullTagValue
+ }
} else {
- tagValue = tagFamily.Tags[j]
+ if srcFamily == nil || len(srcFamily.Tags) <= j
{
+ tagValue = pbv1.NullTagValue
+ } else {
+ tagValue = srcFamily.Tags[j]
+ }
}
- t := tagFamilySpec.Tags[j]
indexed := false
if r, ok := tfr[t.Name]; ok && tagValue !=
pbv1.NullTagValue {
if r.GetType() ==
databasev1.IndexRule_TYPE_INVERTED {
@@ -277,6 +288,8 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
groups := make(map[string]*elementsInGroup)
+ var metadata *commonv1.Metadata
+ var spec []*streamv1.TagFamilySpec
for i := range events {
var writeEvent *streamv1.InternalWriteRequest
switch e := events[i].(type) {
@@ -292,8 +305,15 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
w.l.Warn().Msg("invalid event data type")
continue
}
+ req := writeEvent.Request
+ if req != nil && req.GetMetadata() != nil {
+ metadata = req.GetMetadata()
+ }
+ if req != nil && req.GetTagFamilySpec() != nil {
+ spec = req.GetTagFamilySpec()
+ }
var err error
- if groups, err = w.handle(groups, writeEvent); err != nil {
+ if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
w.l.Error().Err(err).Msg("cannot handle write event")
groups = make(map[string]*elementsInGroup)
continue
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int,
map[string]map[string]int) {
+ if spec == nil {
+ return nil, nil
+ }
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ return specFamilyMap, specTagMaps
+}
+
+func getSrcFamilyAndTagMap(spec []*streamv1.TagFamilySpec, tagFamilySpec
*databasev1.TagFamilySpec,
+ req *streamv1.WriteRequest, i int, specFamilyMap map[string]int,
+ specTagMaps map[string]map[string]int,
+) (*modelv1.TagFamilyForWrite, map[string]int) {
+ var srcFamily *modelv1.TagFamilyForWrite
+ var specTagMap map[string]int
+ if spec != nil {
+ specIdx, ok := specFamilyMap[tagFamilySpec.Name]
+ if ok && specIdx < len(req.Element.TagFamilies) {
+ srcFamily = req.Element.TagFamilies[specIdx]
+ }
+ specTagMap = specTagMaps[tagFamilySpec.Name]
+ } else if len(req.Element.TagFamilies) > i {
+ srcFamily = req.Element.TagFamilies[i]
+ }
+ return srcFamily, specTagMap
+}
+
func encodeTagValue(name string, tagType databasev1.TagType, tagVal
*modelv1.TagValue) *tagValue {
tv := generateTagValue()
tv.tag = name
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 6bf27f41..a234cfab 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/sidx"
@@ -70,12 +71,15 @@ func (w *writeQueueCallback) CheckHealth() *common.Error {
return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk
usage is too high, stop writing")
}
-func (w *writeQueueCallback) handle(dst map[string]*tracesInQueue, writeEvent
*tracev1.InternalWriteRequest) (map[string]*tracesInQueue, error) {
- stm, ok := w.schemaRepo.loadTrace(writeEvent.GetRequest().GetMetadata())
+func (w *writeQueueCallback) handle(dst map[string]*tracesInQueue, writeEvent
*tracev1.InternalWriteRequest,
+ metadata *commonv1.Metadata, spec *tracev1.TagSpec,
+) (map[string]*tracesInQueue, error) {
+ stm, ok := w.schemaRepo.loadTrace(metadata)
if !ok {
- return nil, fmt.Errorf("cannot find trace definition: %s",
writeEvent.GetRequest().GetMetadata())
+ return nil, fmt.Errorf("cannot find trace definition: %s",
metadata)
}
- idx, err := getTagIndex(stm, stm.schema.TimestampTagName)
+ specMap := buildSpecMap(spec)
+ idx, err := getTagIndex(stm, stm.schema.TimestampTagName, specMap)
if err != nil {
return nil, err
}
@@ -84,7 +88,7 @@ func (w *writeQueueCallback) handle(dst
map[string]*tracesInQueue, writeEvent *t
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
- eq, err := w.prepareElementsInQueue(dst, writeEvent)
+ eq, err := w.prepareElementsInQueue(dst, metadata)
if err != nil {
return nil, err
}
@@ -92,15 +96,15 @@ func (w *writeQueueCallback) handle(dst
map[string]*tracesInQueue, writeEvent *t
if err != nil {
return nil, err
}
- err = processTraces(w.schemaRepo, et, writeEvent)
+ err = processTraces(w.schemaRepo, et, writeEvent, metadata, spec)
if err != nil {
return nil, err
}
return dst, nil
}
-func (w *writeQueueCallback) prepareElementsInQueue(dst
map[string]*tracesInQueue, writeEvent *tracev1.InternalWriteRequest)
(*tracesInQueue, error) {
- gn := writeEvent.Request.Metadata.Group
+func (w *writeQueueCallback) prepareElementsInQueue(dst
map[string]*tracesInQueue, metadata *commonv1.Metadata) (*tracesInQueue, error)
{
+ gn := metadata.Group
queue, err := w.schemaRepo.loadQueue(gn)
if err != nil {
return nil, fmt.Errorf("cannot load queue for group %s: %w",
gn, err)
@@ -165,6 +169,8 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
return
}
groups := make(map[string]*tracesInQueue)
+ var metadata *commonv1.Metadata
+ var spec *tracev1.TagSpec
for i := range events {
var writeEvent *tracev1.InternalWriteRequest
switch e := events[i].(type) {
@@ -180,8 +186,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
w.l.Warn().Msg("invalid event data type")
continue
}
+ req := writeEvent.Request
+ if req != nil && req.GetMetadata() != nil {
+ metadata = req.GetMetadata()
+ }
+ if req != nil && req.GetTagSpec() != nil {
+ spec = req.GetTagSpec()
+ }
var err error
- if groups, err = w.handle(groups, writeEvent); err != nil {
+ if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
w.l.Error().Err(err).Msg("cannot handle write event")
groups = make(map[string]*tracesInQueue)
continue
diff --git a/banyand/trace/write_standalone.go
b/banyand/trace/write_standalone.go
index 18e09722..d7057662 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -26,6 +26,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
@@ -70,12 +71,14 @@ func (w *writeCallback) CheckHealth() *common.Error {
}
func (w *writeCallback) handle(dst map[string]*tracesInGroup, writeEvent
*tracev1.InternalWriteRequest,
+ metadata *commonv1.Metadata, spec *tracev1.TagSpec,
) (map[string]*tracesInGroup, error) {
- stm, ok := w.schemaRepo.loadTrace(writeEvent.GetRequest().GetMetadata())
+ stm, ok := w.schemaRepo.loadTrace(metadata)
if !ok {
- return nil, fmt.Errorf("cannot find trace definition: %s",
writeEvent.GetRequest().GetMetadata())
+ return nil, fmt.Errorf("cannot find trace definition: %s",
metadata)
}
- idx, err := getTagIndex(stm, stm.schema.TimestampTagName)
+ specMap := buildSpecMap(spec)
+ idx, err := getTagIndex(stm, stm.schema.TimestampTagName, specMap)
if err != nil {
return nil, err
}
@@ -84,7 +87,7 @@ func (w *writeCallback) handle(dst map[string]*tracesInGroup,
writeEvent *tracev
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
- eg, err := w.prepareTracesInGroup(dst, writeEvent, ts)
+ eg, err := w.prepareTracesInGroup(dst, metadata, ts)
if err != nil {
return nil, err
}
@@ -92,15 +95,15 @@ func (w *writeCallback) handle(dst
map[string]*tracesInGroup, writeEvent *tracev
if err != nil {
return nil, err
}
- err = processTraces(w.schemaRepo, et, writeEvent)
+ err = processTraces(w.schemaRepo, et, writeEvent, metadata, spec)
if err != nil {
return nil, err
}
return dst, nil
}
-func (w *writeCallback) prepareTracesInGroup(dst map[string]*tracesInGroup,
writeEvent *tracev1.InternalWriteRequest, ts int64) (*tracesInGroup, error) {
- gn := writeEvent.Request.Metadata.Group
+func (w *writeCallback) prepareTracesInGroup(dst map[string]*tracesInGroup,
metadata *commonv1.Metadata, ts int64) (*tracesInGroup, error) {
+ gn := metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn,
err)
@@ -170,15 +173,15 @@ func (w *writeCallback) prepareTracesInTable(eg
*tracesInGroup, writeEvent *trac
return et, nil
}
-func extractTraceSpanInfo(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest) (string, error) {
- idx, err := getTagIndex(stm, stm.schema.TraceIdTagName)
+func extractTraceSpanInfo(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest, specMap map[string]int) (string, error) {
+ idx, err := getTagIndex(stm, stm.schema.TraceIdTagName, specMap)
if err != nil {
return "", err
}
traceID := req.Tags[idx].GetStr().GetValue()
tracesInTable.traces.traceIDs = append(tracesInTable.traces.traceIDs,
traceID)
- idx, err = getTagIndex(stm, stm.schema.SpanIdTagName)
+ idx, err = getTagIndex(stm, stm.schema.SpanIdTagName, specMap)
if err != nil {
return "", err
}
@@ -189,13 +192,19 @@ func extractTraceSpanInfo(stm *trace, tracesInTable
*tracesInTable, req *tracev1
return traceID, nil
}
-func validateTags(stm *trace, req *tracev1.WriteRequest) error {
+func validateTags(stm *trace, req *tracev1.WriteRequest, spec
*tracev1.TagSpec) error {
tLen := len(req.GetTags())
if tLen < 1 {
return fmt.Errorf("%s has no tag family", req)
}
- if tLen > len(stm.schema.GetTags()) {
- return fmt.Errorf("%s has more tag than %s", req.Metadata,
stm.schema)
+ if spec != nil {
+ if tLen > len(spec.GetTagNames()) {
+ return fmt.Errorf("request has more tags than spec: %d
> %d", tLen, len(spec.GetTagNames()))
+ }
+ } else {
+ if tLen > len(stm.schema.GetTags()) {
+ return fmt.Errorf("%s has more tag than %s",
req.Metadata, stm.schema)
+ }
}
is := stm.indexSchema.Load().(indexSchema)
@@ -207,7 +216,7 @@ func validateTags(stm *trace, req *tracev1.WriteRequest)
error {
return nil
}
-func buildTagsAndMap(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest) ([]*tagValue, map[string]*tagValue) {
+func buildTagsAndMap(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest, specMap map[string]int) ([]*tagValue,
map[string]*tagValue) {
tags := make([]*tagValue, 0, len(stm.schema.Tags))
tagMap := make(map[string]*tagValue, len(stm.schema.Tags))
tagSpecs := stm.GetSchema().GetTags()
@@ -217,15 +226,15 @@ func buildTagsAndMap(stm *trace, tracesInTable
*tracesInTable, req *tracev1.Writ
if tagSpec.Name == stm.schema.TraceIdTagName || tagSpec.Name ==
stm.schema.SpanIdTagName {
continue
}
- if tagSpec.Name == stm.schema.TimestampTagName {
- tracesInTable.traces.timestamps =
append(tracesInTable.traces.timestamps,
req.Tags[i].GetTimestamp().AsTime().UnixNano())
- }
-
+ tagIdx, err := getTagIndex(stm, tagSpec.Name, specMap)
var tagValue *modelv1.TagValue
- if len(req.Tags) <= i {
+ if err != nil || tagIdx >= len(req.Tags) {
tagValue = pbv1.NullTagValue
} else {
- tagValue = req.Tags[i]
+ tagValue = req.Tags[tagIdx]
+ }
+ if tagSpec.Name == stm.schema.TimestampTagName && tagValue !=
pbv1.NullTagValue {
+ tracesInTable.traces.timestamps =
append(tracesInTable.traces.timestamps,
tagValue.GetTimestamp().AsTime().UnixNano())
}
tv := encodeTagValue(tagSpec.Name, tagSpec.Type, tagValue)
tags = append(tags, tv)
@@ -256,11 +265,13 @@ func buildSidxTags(tags []*tagValue) []sidx.Tag {
return sidxTags
}
-func processIndexRules(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest, traceID string, tagMap map[string]*tagValue, sidxTags
[]sidx.Tag) error {
+func processIndexRules(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest,
+ traceID string, tagMap map[string]*tagValue, sidxTags []sidx.Tag,
metadata *commonv1.Metadata, specMap map[string]int,
+) error {
indexRules := stm.GetIndexRules()
for _, indexRule := range indexRules {
tagName := indexRule.Tags[len(indexRule.Tags)-1]
- tagIdx, err := getTagIndex(stm, tagName)
+ tagIdx, err := getTagIndex(stm, tagName, specMap)
if err != nil || tagIdx >= len(req.Tags) {
continue
}
@@ -283,7 +294,7 @@ func processIndexRules(stm *trace, tracesInTable
*tracesInTable, req *tracev1.Wr
entityValues := make([]*modelv1.TagValue, 0,
len(indexRule.Tags))
for i, tagName := range indexRule.Tags {
- tagIdx, err := getTagIndex(stm, tagName)
+ tagIdx, err := getTagIndex(stm, tagName, specMap)
if err != nil || tagIdx >= len(req.Tags) {
continue
}
@@ -294,7 +305,7 @@ func processIndexRules(stm *trace, tracesInTable
*tracesInTable, req *tracev1.Wr
}
series := &pbv1.Series{
- Subject: req.Metadata.Name,
+ Subject: metadata.Name,
EntityValues: entityValues,
}
if err := series.Marshal(); err != nil {
@@ -348,26 +359,25 @@ func processIndexRules(stm *trace, tracesInTable
*tracesInTable, req *tracev1.Wr
return nil
}
-func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable,
writeEvent *tracev1.InternalWriteRequest) error {
+func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable,
writeEvent *tracev1.InternalWriteRequest,
+ metadata *commonv1.Metadata, spec *tracev1.TagSpec,
+) error {
req := writeEvent.Request
- stm, ok := schemaRepo.loadTrace(req.GetMetadata())
+ stm, ok := schemaRepo.loadTrace(metadata)
if !ok {
- return fmt.Errorf("cannot find trace definition: %s",
req.GetMetadata())
+ return fmt.Errorf("cannot find trace definition: %s", metadata)
}
-
- traceID, err := extractTraceSpanInfo(stm, tracesInTable, req)
+ specMap := buildSpecMap(spec)
+ traceID, err := extractTraceSpanInfo(stm, tracesInTable, req, specMap)
if err != nil {
return err
}
-
- if err := validateTags(stm, req); err != nil {
+ if err := validateTags(stm, req, spec); err != nil {
return err
}
-
- tags, tagMap := buildTagsAndMap(stm, tracesInTable, req)
+ tags, tagMap := buildTagsAndMap(stm, tracesInTable, req, specMap)
sidxTags := buildSidxTags(tags)
-
- return processIndexRules(stm, tracesInTable, req, traceID, tagMap,
sidxTags)
+ return processIndexRules(stm, tracesInTable, req, traceID, tagMap,
sidxTags, metadata, specMap)
}
func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp
bus.Message) {
@@ -381,6 +391,8 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
groups := make(map[string]*tracesInGroup)
+ var metadata *commonv1.Metadata
+ var spec *tracev1.TagSpec
for i := range events {
var writeEvent *tracev1.InternalWriteRequest
switch e := events[i].(type) {
@@ -396,8 +408,15 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
w.l.Warn().Msg("invalid event data type")
continue
}
+ req := writeEvent.Request
+ if req != nil && req.GetMetadata() != nil {
+ metadata = req.GetMetadata()
+ }
+ if req != nil && req.GetTagSpec() != nil {
+ spec = req.GetTagSpec()
+ }
var err error
- if groups, err = w.handle(groups, writeEvent); err != nil {
+ if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
w.l.Error().Err(err).Msg("cannot handle write event")
groups = make(map[string]*tracesInGroup)
continue
@@ -444,6 +463,32 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+func buildSpecMap(spec *tracev1.TagSpec) map[string]int {
+ if spec == nil {
+ return nil
+ }
+ specMap := make(map[string]int, 0)
+ for i, name := range spec.GetTagNames() {
+ specMap[name] = i
+ }
+ return specMap
+}
+
+func getTagIndex(trace *trace, name string, specMap map[string]int) (int,
error) {
+ if specMap != nil {
+ if idx, ok := specMap[name]; ok {
+ return idx, nil
+ }
+ return -1, fmt.Errorf("tag %s not found in spec", name)
+ }
+ for i, tag := range trace.schema.Tags {
+ if tag.Name == name {
+ return i, nil
+ }
+ }
+ return -1, fmt.Errorf("tag %s not found in trace %s", name, trace.name)
+}
+
func encodeTagValue(name string, tagType databasev1.TagType, tagVal
*modelv1.TagValue) *tagValue {
tv := generateTagValue()
tv.tag = name
@@ -494,12 +539,3 @@ func encodeTagValue(name string, tagType
databasev1.TagType, tagVal *modelv1.Tag
}
return tv
}
-
-func getTagIndex(trace *trace, name string) (int, error) {
- for i, tag := range trace.schema.Tags {
- if tag.Name == name {
- return i, nil
- }
- }
- return -1, fmt.Errorf("tag %s not found in trace %s", name, trace.name)
-}
diff --git a/pkg/test/stream/testdata/group.json
b/pkg/test/stream/testdata/group.json
index e4116dd7..8e312f1c 100644
--- a/pkg/test/stream/testdata/group.json
+++ b/pkg/test/stream/testdata/group.json
@@ -18,6 +18,25 @@
},
"updated_at": "2021-04-15T01:30:15.01Z"
},
+ {
+ "metadata": {
+ "name": "default-spec"
+ },
+ "catalog": "CATALOG_STREAM",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 1,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ }
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+ },
{
"metadata": {
"name": "updated"
@@ -36,4 +55,4 @@
},
"updated_at": "2021-04-15T01:30:15.01Z"
}
-]
\ No newline at end of file
+]
diff --git a/pkg/test/stream/testdata/group_with_stages.json
b/pkg/test/stream/testdata/group_with_stages.json
index b5aad645..749bde93 100644
--- a/pkg/test/stream/testdata/group_with_stages.json
+++ b/pkg/test/stream/testdata/group_with_stages.json
@@ -64,5 +64,38 @@
]
},
"updated_at": "2021-04-15T01:30:15.01Z"
+ },
+ {
+ "metadata": {
+ "name": "default-spec"
+ },
+ "catalog": "CATALOG_STREAM",
+ "resource_opts": {
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "stages": [
+ {
+ "name": "warm",
+ "shard_num": 1,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "node_selector": "type=warm"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
}
]
\ No newline at end of file
diff --git a/pkg/test/stream/testdata/index_rule_bindings/sw_spec.json
b/pkg/test/stream/testdata/index_rule_bindings/sw_spec.json
new file mode 100644
index 00000000..ba14f693
--- /dev/null
+++ b/pkg/test/stream/testdata/index_rule_bindings/sw_spec.json
@@ -0,0 +1,27 @@
+{
+ "metadata": {
+ "name": "sw-spec-index-rule-binding",
+ "group": "default-spec"
+ },
+ "rules": [
+ "trace_id",
+ "duration",
+ "endpoint_id",
+ "status_code",
+ "http.method",
+ "db.instance",
+ "db.type",
+ "mq.broker",
+ "mq.queue",
+ "mq.topic",
+ "extended_tags"
+ ],
+ "subject": {
+ "catalog": "CATALOG_STREAM",
+ "name": "sw"
+ },
+ "begin_at": "2021-04-15T01:30:15.01Z",
+ "expire_at": "2121-04-15T01:30:15.01Z",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/pkg/test/stream/testdata/streams/sw_spec.json
b/pkg/test/stream/testdata/streams/sw_spec.json
new file mode 100644
index 00000000..31b7b58f
--- /dev/null
+++ b/pkg/test/stream/testdata/streams/sw_spec.json
@@ -0,0 +1,95 @@
+{
+ "metadata": {
+ "name": "sw",
+ "group": "default-spec"
+ },
+ "tag_families": [
+ {
+ "name": "data",
+ "tags": [
+ {
+ "name": "data_binary",
+ "type": "TAG_TYPE_DATA_BINARY"
+ }
+ ]
+ },
+ {
+ "name": "searchable",
+ "tags": [
+ {
+ "name": "trace_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "state",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_instance_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "endpoint_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "duration",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "start_time",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "http.method",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "status_code",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "span_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "db.type",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "db.instance",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "mq.queue",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "mq.topic",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "mq.broker",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "extended_tags",
+ "type": "TAG_TYPE_STRING_ARRAY"
+ },
+ {
+ "name": "non_indexed_tags",
+ "type": "TAG_TYPE_STRING_ARRAY"
+ }
+ ]
+ }
+ ],
+ "entity": {
+ "tag_names": ["service_id", "service_instance_id", "state"]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/pkg/test/trace/testdata/groups/test-trace-spec.json
b/pkg/test/trace/testdata/groups/test-trace-spec.json
new file mode 100644
index 00000000..29c24dd0
--- /dev/null
+++ b/pkg/test/trace/testdata/groups/test-trace-spec.json
@@ -0,0 +1,19 @@
+{
+ "metadata": {
+ "name": "test-trace-spec"
+ },
+ "catalog": "CATALOG_TRACE",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 0,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ }
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/trace/testdata/groups_stages/test-trace-spec.json
b/pkg/test/trace/testdata/groups_stages/test-trace-spec.json
new file mode 100644
index 00000000..8cd0290f
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/test-trace-spec.json
@@ -0,0 +1,34 @@
+{
+ "metadata": {
+ "name": "test-trace-spec"
+ },
+ "catalog": "CATALOG_TRACE",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 0,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "stages": [
+ {
+ "name": "warm",
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "node_selector": "type=warm"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rule_bindings/sw_spec.json
b/pkg/test/trace/testdata/index_rule_bindings/sw_spec.json
new file mode 100644
index 00000000..810c1612
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rule_bindings/sw_spec.json
@@ -0,0 +1,14 @@
+{
+ "metadata": {
+ "name": "sw-spec-index-rule-binding",
+ "group": "test-trace-spec"
+ },
+ "rules": ["duration", "timestamp"],
+ "subject": {
+ "catalog": "CATALOG_TRACE",
+ "name": "sw"
+ },
+ "begin_at": "2021-04-15T01:30:15.01Z",
+ "expire_at": "2121-04-15T01:30:15.01Z",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/duration_spec.json
b/pkg/test/trace/testdata/index_rules/duration_spec.json
new file mode 100644
index 00000000..a89781ee
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/duration_spec.json
@@ -0,0 +1,14 @@
+{
+ "metadata": {
+ "name": "duration",
+ "group": "test-trace-spec"
+ },
+ "tags": [
+ "service_id",
+ "service_instance_id",
+ "state",
+ "duration"
+ ],
+ "type": "TYPE_TREE",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/timestamp_spec.json
b/pkg/test/trace/testdata/index_rules/timestamp_spec.json
new file mode 100644
index 00000000..ba1a0e27
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/timestamp_spec.json
@@ -0,0 +1,14 @@
+{
+ "metadata": {
+ "name": "timestamp",
+ "group": "test-trace-spec"
+ },
+ "tags": [
+ "service_id",
+ "service_instance_id",
+ "state",
+ "timestamp"
+ ],
+ "type": "TYPE_TREE",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/sw_spec.json
b/pkg/test/trace/testdata/traces/sw_spec.json
new file mode 100644
index 00000000..9b0841b7
--- /dev/null
+++ b/pkg/test/trace/testdata/traces/sw_spec.json
@@ -0,0 +1,45 @@
+{
+ "metadata": {
+ "name": "sw",
+ "group": "test-trace-spec"
+ },
+ "tags": [
+ {
+ "name": "trace_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "state",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_instance_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "endpoint_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "duration",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "span_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "timestamp",
+ "type": "TAG_TYPE_TIMESTAMP"
+ }
+ ],
+ "trace_id_tag_name": "trace_id",
+ "span_id_tag_name": "span_id",
+ "timestamp_tag_name": "timestamp",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
+
diff --git a/test/cases/init.go b/test/cases/init.go
index dc66b595..8cc611e9 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -26,6 +26,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
caseproperty
"github.com/apache/skywalking-banyandb/test/cases/property/data"
@@ -43,6 +45,33 @@ func Initialize(addr string, now time.Time) {
casesstreamdata.Write(conn, "sw", now, interval)
casesstreamdata.Write(conn, "duplicated", now, 0)
casesstreamdata.WriteToGroup(conn, "sw", "updated", "sw_updated",
now.Add(time.Minute), interval)
+ casesstreamdata.WriteWithSpec(conn, "sw", "default-spec",
now.Add(2*time.Minute), interval,
+ casesstreamdata.SpecWithData{
+ Spec: []*streamv1.TagFamilySpec{
+ {
+ Name: "data",
+ TagNames: []string{"data_binary"},
+ },
+ {
+ Name: "searchable",
+ TagNames: []string{"trace_id", "state",
"service_id", "service_instance_id", "endpoint_id", "duration", "start_time",
"http.method", "status_code", "span_id"},
+ },
+ },
+ DataFile: "sw_spec_order.json",
+ },
+ casesstreamdata.SpecWithData{
+ Spec: []*streamv1.TagFamilySpec{
+ {
+ Name: "searchable",
+ TagNames: []string{"span_id",
"status_code", "http.method", "duration", "state", "endpoint_id",
"service_instance_id", "start_time", "service_id", "trace_id"},
+ },
+ {
+ Name: "data",
+ TagNames: []string{"data_binary"},
+ },
+ },
+ DataFile: "sw_spec_order2.json",
+ })
// measure
interval = time.Minute
casesmeasuredata.Write(conn, "service_traffic", "index_mode",
"service_traffic_data_old.json", now.AddDate(0, 0, -2), interval)
@@ -107,6 +136,19 @@ func Initialize(addr string, now time.Time) {
casestrace.WriteToGroup(conn, "sw", "test-trace-updated", "sw_updated",
now.Add(time.Minute), interval)
time.Sleep(5 * time.Second)
casestrace.WriteToGroup(conn, "sw", "test-trace-group",
"sw_mixed_traces", now.Add(time.Minute), interval)
+ casestrace.WriteWithSpec(conn, "sw", "test-trace-spec",
now.Add(2*time.Minute), interval,
+ casestrace.SpecWithData{
+ Spec: &tracev1.TagSpec{
+ TagNames: []string{"trace_id", "state",
"service_id", "service_instance_id", "endpoint_id", "duration", "span_id",
"timestamp"},
+ },
+ DataFile: "sw_spec_order.json",
+ },
+ casestrace.SpecWithData{
+ Spec: &tracev1.TagSpec{
+ TagNames: []string{"span_id", "duration",
"endpoint_id", "service_instance_id", "service_id", "state", "trace_id",
"timestamp"},
+ },
+ DataFile: "sw_spec_order2.json",
+ })
// property
caseproperty.Write(conn, "sw1")
caseproperty.Write(conn, "sw2")
diff --git a/test/cases/stream/data/data.go b/test/cases/stream/data/data.go
index d99361ba..eb333960 100644
--- a/test/cases/stream/data/data.go
+++ b/test/cases/stream/data/data.go
@@ -274,3 +274,71 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group,
fileName string, baseTi
return err
}, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
}
+
+// SpecWithData pairs a TagFamilySpec with a data file.
+type SpecWithData struct {
+ DataFile string
+ Spec []*streamv1.TagFamilySpec
+}
+
+// WriteWithSpec writes stream data using multiple tag_family_specs to specify
tag names.
+func WriteWithSpec(conn *grpclib.ClientConn, name, group string,
+ baseTime time.Time, interval time.Duration, specDataPairs
...SpecWithData,
+) {
+ ctx := context.Background()
+ md := &commonv1.Metadata{
+ Name: name,
+ Group: group,
+ }
+
+ schemaClient := databasev1.NewStreamRegistryServiceClient(conn)
+ resp, err := schemaClient.Get(ctx,
&databasev1.StreamRegistryServiceGetRequest{Metadata: md})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ md = resp.GetStream().GetMetadata()
+
+ c := streamv1.NewStreamServiceClient(conn)
+ writeClient, err := c.Write(ctx)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ isFirstRequest := true
+ elementCounter := 0
+ currentTime := baseTime
+ for _, pair := range specDataPairs {
+ var templates []interface{}
+ content, err := dataFS.ReadFile("testdata/" + pair.DataFile)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+ gm.Expect(json.Unmarshal(content,
&templates)).ShouldNot(gm.HaveOccurred())
+
+ isFirstForSpec := true
+ for i, template := range templates {
+ rawElementValue, errMarshal := json.Marshal(template)
+ gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
+ elementValue := &streamv1.ElementValue{}
+ gm.Expect(protojson.Unmarshal(rawElementValue,
elementValue)).ShouldNot(gm.HaveOccurred())
+ elementValue.ElementId = strconv.Itoa(elementCounter)
+ elementValue.Timestamp =
timestamppb.New(currentTime.Add(time.Duration(i) * interval))
+
+ req := &streamv1.WriteRequest{
+ Element: elementValue,
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ if isFirstRequest {
+ req.Metadata = md
+ isFirstRequest = false
+ }
+ if isFirstForSpec {
+ req.TagFamilySpec = pair.Spec
+ isFirstForSpec = false
+ }
+ gm.Expect(writeClient.Send(req)).Should(gm.Succeed())
+ elementCounter++
+ }
+ currentTime = currentTime.Add(time.Duration(len(templates)) *
interval)
+ }
+
+ gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+ gm.Eventually(func() error {
+ _, err := writeClient.Recv()
+ return err
+ }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
diff --git a/test/cases/stream/data/input/write_spec.ql
b/test/cases/stream/data/input/write_spec.ql
new file mode 100644
index 00000000..f057350a
--- /dev/null
+++ b/test/cases/stream/data/input/write_spec.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT trace_id, state, duration, status_code, span_id FROM STREAM sw IN
default-spec
+TIME > '-15m'
+WHERE trace_id IN ('spec_trace_2', 'spec_trace_5')
diff --git a/test/cases/stream/data/input/write_spec.yaml
b/test/cases/stream/data/input/write_spec.yaml
new file mode 100644
index 00000000..00fd8854
--- /dev/null
+++ b/test/cases/stream/data/input/write_spec.yaml
@@ -0,0 +1,31 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default-spec"]
+projection:
+ tagFamilies:
+ - name: "searchable"
+ tags: ["trace_id", "state", "duration", "status_code", "span_id"]
+criteria:
+ condition:
+ name: "trace_id"
+ op: "BINARY_OP_IN"
+ value:
+ strArray:
+ value: ["spec_trace_2", "spec_trace_5"]
+
diff --git a/test/cases/stream/data/testdata/sw_spec_order.json
b/test/cases/stream/data/testdata/sw_spec_order.json
new file mode 100644
index 00000000..5e96c4c3
--- /dev/null
+++ b/test/cases/stream/data/testdata/sw_spec_order.json
@@ -0,0 +1,66 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+ },
+ {
+ "tags": [
+ { "str": { "value": "spec_trace_1" } },
+ { "int": { "value": 0 } },
+ { "str": { "value": "webapp_id" } },
+ { "str": { "value": "10.0.0.1_id" } },
+ { "str": { "value": "/home_id" } },
+ { "int": { "value": 100 } },
+ { "int": { "value": 1622933202000000000 } },
+ { "str": { "value": "GET" } },
+ { "str": { "value": "200" } },
+ { "str": { "value": "spec_span_1" } }
+ ]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+ },
+ {
+ "tags": [
+ { "str": { "value": "spec_trace_2" } },
+ { "int": { "value": 1 } },
+ { "str": { "value": "webapp_id" } },
+ { "str": { "value": "10.0.0.2_id" } },
+ { "str": { "value": "/product_id" } },
+ { "int": { "value": 200 } },
+ { "int": { "value": 1622933202000000000 } },
+ { "str": { "value": "POST" } },
+ { "str": { "value": "201" } },
+ { "str": { "value": "spec_span_2" } }
+ ]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+ },
+ {
+ "tags": [
+ { "str": { "value": "spec_trace_3" } },
+ { "int": { "value": 0 } },
+ { "str": { "value": "webapp_id" } },
+ { "str": { "value": "10.0.0.3_id" } },
+ { "str": { "value": "/item_id" } },
+ { "int": { "value": 300 } },
+ { "int": { "value": 1622933202000000000 } },
+ { "str": { "value": "PUT" } },
+ { "str": { "value": "202" } },
+ { "str": { "value": "spec_span_3" } }
+ ]
+ }
+ ]
+ }
+]
+
diff --git a/test/cases/stream/data/testdata/sw_spec_order2.json
b/test/cases/stream/data/testdata/sw_spec_order2.json
new file mode 100644
index 00000000..15b52fb0
--- /dev/null
+++ b/test/cases/stream/data/testdata/sw_spec_order2.json
@@ -0,0 +1,45 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ { "str": { "value": "spec_span_4" } },
+ { "str": { "value": "204" } },
+ { "str": { "value": "DELETE" } },
+ { "int": { "value": 400 } },
+ { "int": { "value": 1 } },
+ { "str": { "value": "/order_id" } },
+ { "str": { "value": "10.0.0.4_id" } },
+ { "int": { "value": 1622933202000000000 } },
+ { "str": { "value": "webapp_id" } },
+ { "str": { "value": "spec_trace_4" } }
+ ]
+ },
+ {
+ "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ { "str": { "value": "spec_span_5" } },
+ { "str": { "value": "200" } },
+ { "str": { "value": "PATCH" } },
+ { "int": { "value": 500 } },
+ { "int": { "value": 0 } },
+ { "str": { "value": "/cart_id" } },
+ { "str": { "value": "10.0.0.5_id" } },
+ { "int": { "value": 1622933202000000000 } },
+ { "str": { "value": "webapp_id" } },
+ { "str": { "value": "spec_trace_5" } }
+ ]
+ },
+ {
+ "tags": [{ "binaryData": "YWJjMTIzIT8kKiYoKSctPUB+" }]
+ }
+ ]
+ }
+]
+
diff --git a/test/cases/stream/data/want/write_spec.yaml
b/test/cases/stream/data/want/write_spec.yaml
new file mode 100644
index 00000000..a40da633
--- /dev/null
+++ b/test/cases/stream/data/want/write_spec.yaml
@@ -0,0 +1,65 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+elements:
+ - tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: spec_trace_2
+ - key: state
+ value:
+ int:
+ value: "1"
+ - key: duration
+ value:
+ int:
+ value: "200"
+ - key: status_code
+ value:
+ str:
+ value: "201"
+ - key: span_id
+ value:
+ str:
+ value: spec_span_2
+ - tagFamilies:
+ - name: searchable
+ tags:
+ - key: trace_id
+ value:
+ str:
+ value: spec_trace_5
+ - key: state
+ value:
+ int:
+ value: "0"
+ - key: duration
+ value:
+ int:
+ value: "500"
+ - key: status_code
+ value:
+ str:
+ value: "200"
+ - key: span_id
+ value:
+ str:
+ value: spec_span_5
+
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index c0715df0..53e965cb 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -91,4 +91,5 @@ var _ = g.DescribeTable("Scanning Streams", func(args
helpers.Args) {
g.Entry("multi-groups: sort duration", helpers.Args{Input:
"multi_group_sort_duration", Duration: 1 * time.Hour, IgnoreElementID: true}),
g.Entry("hybrid index", helpers.Args{Input: "hybrid_index", Duration: 1
* time.Hour, IgnoreElementID: true}),
g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 *
time.Hour, WantErr: true}),
+ g.Entry("write spec", helpers.Args{Input: "write_spec", Duration: 1 *
time.Hour, IgnoreElementID: true}),
)
diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go
index b4d8f599..0ace3f05 100644
--- a/test/cases/trace/data/data.go
+++ b/test/cases/trace/data/data.go
@@ -327,6 +327,94 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group,
fileName string, baseTi
}, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
}
+// SpecWithData pairs a TagSpec with a data file.
+type SpecWithData struct {
+ Spec *tracev1.TagSpec
+ DataFile string
+}
+
+// WriteWithSpec writes trace data using tag_spec to specify tag names.
+func WriteWithSpec(conn *grpclib.ClientConn, name, group string,
+ baseTime time.Time, interval time.Duration, specDataPairs
...SpecWithData,
+) {
+ ctx := context.Background()
+ md := &commonv1.Metadata{
+ Name: name,
+ Group: group,
+ }
+
+ schemaClient := databasev1.NewTraceRegistryServiceClient(conn)
+ resp, err := schemaClient.Get(ctx,
&databasev1.TraceRegistryServiceGetRequest{Metadata: md})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ md = resp.GetTrace().GetMetadata()
+
+ c := tracev1.NewTraceServiceClient(conn)
+ writeClient, err := c.Write(ctx)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ isFirstRequest := true
+ version := uint64(1)
+ currentTime := baseTime
+ for _, pair := range specDataPairs {
+ var templates []interface{}
+ content, err := dataFS.ReadFile("testdata/" + pair.DataFile)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+ gm.Expect(json.Unmarshal(content,
&templates)).ShouldNot(gm.HaveOccurred())
+
+ isFirstForSpec := true
+ for i, template := range templates {
+ templateMap, ok := template.(map[string]interface{})
+ gm.Expect(ok).To(gm.BeTrue())
+
+ spanData, ok := templateMap["span"].(string)
+ gm.Expect(ok).To(gm.BeTrue())
+
+ tagsData, ok := templateMap["tags"].([]interface{})
+ gm.Expect(ok).To(gm.BeTrue())
+
+ var tagValues []*modelv1.TagValue
+ for _, tag := range tagsData {
+ tagBytes, err := json.Marshal(tag)
+ gm.Expect(err).ShouldNot(gm.HaveOccurred())
+ tagValue := &modelv1.TagValue{}
+ gm.Expect(protojson.Unmarshal(tagBytes,
tagValue)).ShouldNot(gm.HaveOccurred())
+ tagValues = append(tagValues, tagValue)
+ }
+
+ timestamp := currentTime.Add(time.Duration(i) *
interval)
+ timestampTag := &modelv1.TagValue{
+ Value: &modelv1.TagValue_Timestamp{
+ Timestamp: timestamppb.New(timestamp),
+ },
+ }
+ tagValues = append(tagValues, timestampTag)
+
+ req := &tracev1.WriteRequest{
+ Tags: tagValues,
+ Span: []byte(spanData),
+ Version: version,
+ }
+ if isFirstRequest {
+ req.Metadata = md
+ isFirstRequest = false
+ }
+ if isFirstForSpec {
+ req.TagSpec = pair.Spec
+ isFirstForSpec = false
+ }
+ gm.Expect(writeClient.Send(req)).Should(gm.Succeed())
+ version++
+ }
+ currentTime = currentTime.Add(time.Duration(len(templates)) *
interval)
+ }
+
+ gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+ gm.Eventually(func() error {
+ _, err := writeClient.Recv()
+ return err
+ }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
+
// unmarshalYAMLWithSpanEncoding decodes YAML with special handling for span
data.
// It converts plain strings in the YAML to base64 encoded strings before
protobuf unmarshaling.
func unmarshalYAMLWithSpanEncoding(yamlData []byte, response
*tracev1.QueryResponse) {
diff --git a/test/cases/trace/data/input/write_spec.ql
b/test/cases/trace/data/input/write_spec.ql
new file mode 100644
index 00000000..3f9ee96e
--- /dev/null
+++ b/test/cases/trace/data/input/write_spec.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT trace_id, state, duration, span_id FROM TRACE sw IN test-trace-spec
+TIME > '-15m'
+WHERE trace_id IN ('spec_trace_001', 'spec_trace_003')
diff --git a/test/cases/trace/data/input/write_spec.yml
b/test/cases/trace/data/input/write_spec.yml
new file mode 100644
index 00000000..3ca0807d
--- /dev/null
+++ b/test/cases/trace/data/input/write_spec.yml
@@ -0,0 +1,28 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-spec"]
+tag_projection: ["trace_id", "state", "duration", "span_id"]
+criteria:
+ condition:
+ name: "trace_id"
+ op: "BINARY_OP_IN"
+ value:
+ str_array:
+ value: ["spec_trace_001", "spec_trace_003"]
+
diff --git a/test/cases/trace/data/testdata/sw_spec_order.json
b/test/cases/trace/data/testdata/sw_spec_order.json
new file mode 100644
index 00000000..264e0d33
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw_spec_order.json
@@ -0,0 +1,123 @@
+[
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "spec_trace_001"
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_service"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_instance_1"
+ }
+ },
+ {
+ "str": {
+ "value": "/home_endpoint"
+ }
+ },
+ {
+ "int": {
+ "value": 1100
+ }
+ },
+ {
+ "str": {
+ "value": "spec_span_001_1"
+ }
+ }
+ ],
+ "span": "spec_trace_001_span_1"
+ },
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "spec_trace_001"
+ }
+ },
+ {
+ "int": {
+ "value": 0
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_service"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_instance_2"
+ }
+ },
+ {
+ "str": {
+ "value": "/product_endpoint"
+ }
+ },
+ {
+ "int": {
+ "value": 600
+ }
+ },
+ {
+ "str": {
+ "value": "spec_span_001_2"
+ }
+ }
+ ],
+ "span": "spec_trace_001_span_2"
+ },
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "spec_trace_002"
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_service"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_instance_3"
+ }
+ },
+ {
+ "str": {
+ "value": "/item_endpoint"
+ }
+ },
+ {
+ "int": {
+ "value": 900
+ }
+ },
+ {
+ "str": {
+ "value": "spec_span_002_1"
+ }
+ }
+ ],
+ "span": "spec_trace_002_span_1"
+ }
+]
+
diff --git a/test/cases/trace/data/testdata/sw_spec_order2.json
b/test/cases/trace/data/testdata/sw_spec_order2.json
new file mode 100644
index 00000000..6b4d8164
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw_spec_order2.json
@@ -0,0 +1,83 @@
+[
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "spec_span_003_1"
+ }
+ },
+ {
+ "int": {
+ "value": 750
+ }
+ },
+ {
+ "str": {
+ "value": "/order_endpoint"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_instance_1"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_service"
+ }
+ },
+ {
+ "int": {
+ "value": 0
+ }
+ },
+ {
+ "str": {
+ "value": "spec_trace_003"
+ }
+ }
+ ],
+ "span": "spec_trace_003_span_1"
+ },
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "spec_span_003_2"
+ }
+ },
+ {
+ "int": {
+ "value": 450
+ }
+ },
+ {
+ "str": {
+ "value": "/cart_endpoint"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_instance_2"
+ }
+ },
+ {
+ "str": {
+ "value": "webapp_service"
+ }
+ },
+ {
+ "int": {
+ "value": 0
+ }
+ },
+ {
+ "str": {
+ "value": "spec_trace_003"
+ }
+ }
+ ],
+ "span": "spec_trace_003_span_2"
+ }
+]
+
diff --git a/test/cases/trace/data/want/write_spec.yml
b/test/cases/trace/data/want/write_spec.yml
new file mode 100644
index 00000000..b0d6305c
--- /dev/null
+++ b/test/cases/trace/data/want/write_spec.yml
@@ -0,0 +1,96 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: spec_trace_001_span_1
+ spanId: spec_span_001_1
+ tags:
+ - key: state
+ value:
+ int:
+ value: "1"
+ - key: duration
+ value:
+ int:
+ value: "1100"
+ - key: trace_id
+ value:
+ str:
+ value: spec_trace_001
+ - key: span_id
+ value:
+ str:
+ value: spec_span_001_1
+ - span: spec_trace_001_span_2
+ spanId: spec_span_001_2
+ tags:
+ - key: state
+ value:
+ int: {}
+ - key: duration
+ value:
+ int:
+ value: "600"
+ - key: trace_id
+ value:
+ str:
+ value: spec_trace_001
+ - key: span_id
+ value:
+ str:
+ value: spec_span_001_2
+ traceId: spec_trace_001
+ - spans:
+ - span: spec_trace_003_span_1
+ spanId: spec_span_003_1
+ tags:
+ - key: state
+ value:
+ int: {}
+ - key: duration
+ value:
+ int:
+ value: "750"
+ - key: trace_id
+ value:
+ str:
+ value: spec_trace_003
+ - key: span_id
+ value:
+ str:
+ value: spec_span_003_1
+ - span: spec_trace_003_span_2
+ spanId: spec_span_003_2
+ tags:
+ - key: state
+ value:
+ int: {}
+ - key: duration
+ value:
+ int:
+ value: "450"
+ - key: trace_id
+ value:
+ str:
+ value: spec_trace_003
+ - key: span_id
+ value:
+ str:
+ value: spec_span_003_2
+ traceId: spec_trace_003
+
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index fdf141f1..cc72cf12 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -59,4 +59,5 @@ var _ = g.DescribeTable("Scanning Traces", func(args
helpers.Args) {
g.Entry("multi-groups: new tag", helpers.Args{Input:
"multi_group_new_tag", Duration: 1 * time.Hour}),
g.Entry("multi-groups: tag type change", helpers.Args{Input:
"multi_group_tag_type", Duration: 1 * time.Hour}),
g.Entry("multi-groups: sort by duration", helpers.Args{Input:
"multi_group_sort_duration", Duration: 1 * time.Hour}),
+ g.Entry("write spec", helpers.Args{Input: "write_spec", Duration: 1 *
time.Hour, DisOrder: true}),
)