This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c0f72ce7 Implement multi-group query for trace (#800)
c0f72ce7 is described below
commit c0f72ce739085f5ad53d20db05527f02f3d2c4d9
Author: Huang Youliang <[email protected]>
AuthorDate: Wed Oct 8 21:34:55 2025 +0800
Implement multi-group query for trace (#800)
---
banyand/query/processor.go | 30 ++--
pkg/query/logical/trace/trace_analyzer.go | 39 ++--
pkg/query/logical/trace/trace_plan_distributed.go | 6 +-
pkg/query/logical/trace/trace_plan_local.go | 11 +-
pkg/query/logical/trace/trace_plan_merge.go | 199 ++++++++++++++++++++-
pkg/query/logical/trace/trace_plan_tag_filter.go | 2 +
pkg/query/model/model.go | 11 +-
.../trace/testdata/groups/test-trace-updated.json | 19 ++
.../testdata/index_rule_bindings/sw_updated.json | 17 ++
.../testdata/index_rules/duration_updated.json | 14 ++
.../testdata/index_rules/timestamp_updated.json | 14 ++
pkg/test/trace/testdata/traces/sw_updated.json | 43 +++++
test/cases/init.go | 1 +
.../cases/trace/data/input/multi_group_new_tag.yml | 27 +++
.../trace/data/input/multi_group_sort_duration.yml | 31 ++++
.../trace/data/input/multi_group_tag_type.yml | 27 +++
.../trace/data/input/multi_group_unchanged.yml | 27 +++
test/cases/trace/data/testdata/sw_updated.json | 122 +++++++++++++
test/cases/trace/data/want/multi_group_new_tag.yml | 179 ++++++++++++++++++
.../trace/data/want/multi_group_sort_duration.yml | 81 +++++++++
.../cases/trace/data/want/multi_group_tag_type.yml | 190 ++++++++++++++++++++
.../trace/data/want/multi_group_unchanged.yml | 142 +++++++++++++++
test/cases/trace/trace.go | 4 +
23 files changed, 1184 insertions(+), 52 deletions(-)
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 73cbbbc6..06c50ee8 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -464,8 +464,7 @@ func (p *traceQueryProcessor) executeQuery(ctx
context.Context, queryCriteria *t
var metadata []*commonv1.Metadata
var schemas []logical.Schema
var ecc []executor.TraceExecutionContext
- var traceIDTagName string
- var timestampTagName string
+ var traceIDTagNames []string
for i := range queryCriteria.Groups {
meta := &commonv1.Metadata{
Name: queryCriteria.Name,
@@ -484,16 +483,10 @@ func (p *traceQueryProcessor) executeQuery(ctx
context.Context, queryCriteria *t
}
schemas = append(schemas, s)
metadata = append(metadata, meta)
- if traceIDTagName != "" && traceIDTagName !=
ec.GetSchema().GetTraceIdTagName() {
- resp = bus.NewMessage(bus.MessageID(now),
common.NewError("trace id tag name mismatch for trace %s: %s != %s",
- meta.GetName(), traceIDTagName,
ec.GetSchema().GetTraceIdTagName()))
- return
- }
- traceIDTagName = ec.GetSchema().GetTraceIdTagName()
- timestampTagName = ec.GetSchema().GetTimestampTagName()
+ traceIDTagNames = append(traceIDTagNames,
ec.GetSchema().GetTraceIdTagName())
}
- plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas,
ecc, traceIDTagName, timestampTagName)
+ plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas,
ecc, traceIDTagNames)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for trace %s: %v", queryCriteria.GetName(), err))
return
@@ -533,12 +526,16 @@ func (p *traceQueryProcessor) executeQuery(ctx
context.Context, queryCriteria *t
return
}
- // Convert model.TraceResult iterator to tracev1.QueryResponse format
- // Each result contains spans from a single trace, so we can directly
create traces
var traces []*tracev1.InternalTrace
- // Check if trace ID tag should be included based on tag projection
- shouldIncludeTraceID := slices.Contains(queryCriteria.TagProjection,
traceIDTagName)
+ traceIDTagNameMap := make(map[int]string)
+ traceIDInclusionMap := make(map[int]bool)
+ for i, tagName := range traceIDTagNames {
+ traceIDTagNameMap[i] = tagName
+ if slices.Contains(queryCriteria.TagProjection, tagName) {
+ traceIDInclusionMap[i] = true
+ }
+ }
for {
result, hasNext := resultIterator.Next()
@@ -575,9 +572,10 @@ func (p *traceQueryProcessor) executeQuery(ctx
context.Context, queryCriteria *t
}
// Add trace ID tag to each span if it should be
included
- if shouldIncludeTraceID && result.TID != "" {
+ // Use the group index to select the correct
traceIDTagName
+ if traceIDInclusionMap[result.GroupIndex] && result.TID
!= "" {
traceTags = append(traceTags, &modelv1.Tag{
- Key: traceIDTagName,
+ Key:
traceIDTagNameMap[result.GroupIndex],
Value: &modelv1.TagValue{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
diff --git a/pkg/query/logical/trace/trace_analyzer.go
b/pkg/query/logical/trace/trace_analyzer.go
index 99124168..f9d56725 100644
--- a/pkg/query/logical/trace/trace_analyzer.go
+++ b/pkg/query/logical/trace/trace_analyzer.go
@@ -33,29 +33,28 @@ const defaultLimit uint32 = 20
// Analyze converts logical expressions to executable operation tree
represented by Plan.
func Analyze(criteria *tracev1.QueryRequest, metadata []*commonv1.Metadata, ss
[]logical.Schema,
- ecc []executor.TraceExecutionContext, traceIDTagName, timestampTagName
string,
+ ecc []executor.TraceExecutionContext, traceIDTagNames []string,
) (logical.Plan, error) {
- // parse fields
if len(metadata) != len(ss) {
return nil, fmt.Errorf("number of schemas %d not equal to
number of metadata %d", len(ss), len(metadata))
}
- var orderByTag string
- if criteria.OrderBy != nil {
- indexRuleName := criteria.OrderBy.IndexRuleName
- ok, indexRule := ss[0].IndexRuleDefined(indexRuleName)
- if !ok {
- return nil, fmt.Errorf("index rule %s not found",
indexRuleName)
- }
- ot := indexRule.Tags[len(indexRule.Tags)-1]
- if ot != timestampTagName {
- orderByTag = ot
- }
+ if len(traceIDTagNames) != len(metadata) {
+ return nil, fmt.Errorf("number of traceIDTagNames %d not equal
to number of metadata %d", len(traceIDTagNames), len(metadata))
}
+
var plan logical.UnresolvedPlan
var s logical.Schema
tagProjection :=
convertStringProjectionToTags(criteria.GetTagProjection())
if len(metadata) == 1 {
- plan = parseTraceTags(criteria, metadata[0], ecc[0],
tagProjection, traceIDTagName, orderByTag)
+ var orderByTag string
+ if criteria.OrderBy != nil && criteria.OrderBy.IndexRuleName !=
"" {
+ ok, indexRule :=
ss[0].IndexRuleDefined(criteria.OrderBy.IndexRuleName)
+ if !ok {
+ return nil, fmt.Errorf("index rule %s not
found", criteria.OrderBy.IndexRuleName)
+ }
+ orderByTag = indexRule.Tags[len(indexRule.Tags)-1]
+ }
+ plan = parseTraceTags(criteria, metadata[0], ecc[0],
tagProjection, traceIDTagNames[0], orderByTag, 0)
s = ss[0]
} else {
var err error
@@ -63,10 +62,11 @@ func Analyze(criteria *tracev1.QueryRequest, metadata
[]*commonv1.Metadata, ss [
return nil, err
}
plan = &unresolvedTraceMerger{
- criteria: criteria,
- metadata: metadata,
- ecc: ecc,
- tagProjection: tagProjection,
+ criteria: criteria,
+ metadata: metadata,
+ ecc: ecc,
+ tagProjection: tagProjection,
+ traceIDTagNames: traceIDTagNames,
}
}
@@ -223,7 +223,7 @@ func newTraceLimit(input logical.UnresolvedPlan, offset,
num uint32) logical.Unr
}
func parseTraceTags(criteria *tracev1.QueryRequest, metadata
*commonv1.Metadata,
- ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag,
traceIDTagName, orderByTag string,
+ ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag,
traceIDTagName, orderByTag string, groupIndex int,
) logical.UnresolvedPlan {
timeRange := criteria.GetTimeRange()
return &unresolvedTraceTagFilter{
@@ -235,6 +235,7 @@ func parseTraceTags(criteria *tracev1.QueryRequest,
metadata *commonv1.Metadata,
ec: ec,
traceIDTagName: traceIDTagName,
orderByTag: orderByTag,
+ groupIndex: groupIndex,
}
}
diff --git a/pkg/query/logical/trace/trace_plan_distributed.go
b/pkg/query/logical/trace/trace_plan_distributed.go
index 092c4f5d..1238967e 100644
--- a/pkg/query/logical/trace/trace_plan_distributed.go
+++ b/pkg/query/logical/trace/trace_plan_distributed.go
@@ -151,7 +151,7 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
return iter.Empty[model.TraceResult](), err
}
var allErr error
- var ct []sort.Iterator[*comparableTrace]
+ var st []sort.Iterator[*comparableTrace]
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
@@ -164,11 +164,11 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
if span != nil {
span.AddSubTrace(resp.TraceQueryResult)
}
- ct = append(ct,
+ st = append(st,
newSortableTraces(resp.InternalTraces,
p.sortByTraceID))
}
}
- sortIter := sort.NewItemIter(ct, p.desc)
+ sortIter := sort.NewItemIter(st, p.desc)
var result []*tracev1.InternalTrace
seen := make(map[string]bool)
for sortIter.Next() {
diff --git a/pkg/query/logical/trace/trace_plan_local.go
b/pkg/query/logical/trace/trace_plan_local.go
index 93862f32..9dae3174 100644
--- a/pkg/query/logical/trace/trace_plan_local.go
+++ b/pkg/query/logical/trace/trace_plan_local.go
@@ -55,6 +55,7 @@ type localScan struct {
maxTraceSize int
minVal int64
maxVal int64
+ groupIndex int
}
func (i *localScan) Close() {
@@ -108,14 +109,15 @@ func (i *localScan) Execute(ctx context.Context)
(iter.Iterator[model.TraceResul
}
// Return a custom iterator that continuously pulls from i.result
- return &traceResultIterator{result: i.result}, nil
+ return &traceResultIterator{result: i.result, groupIndex:
i.groupIndex}, nil
}
// traceResultIterator implements iter.Iterator[model.TraceResult] by
continuously
// calling Pull() on the TraceQueryResult until it returns nil or encounters
an error.
type traceResultIterator struct {
- result model.TraceQueryResult
- err error
+ result model.TraceQueryResult
+ err error
+ groupIndex int
}
func (tri *traceResultIterator) Next() (model.TraceResult, bool) {
@@ -134,6 +136,9 @@ func (tri *traceResultIterator) Next() (model.TraceResult,
bool) {
return *traceResult, false
}
+ // Set the group index
+ traceResult.GroupIndex = tri.groupIndex
+
return *traceResult, true
}
diff --git a/pkg/query/logical/trace/trace_plan_merge.go
b/pkg/query/logical/trace/trace_plan_merge.go
index 8ca581f6..e32f89fb 100644
--- a/pkg/query/logical/trace/trace_plan_merge.go
+++ b/pkg/query/logical/trace/trace_plan_merge.go
@@ -18,24 +18,211 @@
package trace
import (
+ "context"
"fmt"
+ "go.uber.org/multierr"
+
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/iter"
+ "github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
)
var _ logical.UnresolvedPlan = (*unresolvedTraceMerger)(nil)
type unresolvedTraceMerger struct {
- criteria *tracev1.QueryRequest
- metadata []*commonv1.Metadata
- ecc []executor.TraceExecutionContext
- tagProjection [][]*logical.Tag
+ criteria *tracev1.QueryRequest
+ metadata []*commonv1.Metadata
+ ecc []executor.TraceExecutionContext
+ tagProjection [][]*logical.Tag
+ traceIDTagNames []string
}
// Analyze implements logical.UnresolvedPlan.
-func (u *unresolvedTraceMerger) Analyze(_ logical.Schema) (logical.Plan,
error) {
- return nil, fmt.Errorf("trace merger not implemented yet")
+func (u *unresolvedTraceMerger) Analyze(s logical.Schema) (logical.Plan,
error) {
+ ss := s.Children()
+ if len(ss) != len(u.metadata) {
+ return nil, fmt.Errorf("number of schemas %d not equal to
metadata count %d", len(ss), len(u.metadata))
+ }
+
+ if len(u.tagProjection) > 0 {
+ projectionTagRefs, err := s.CreateTagRef(u.tagProjection...)
+ if err != nil {
+ return nil, err
+ }
+ s = s.ProjTags(projectionTagRefs...)
+ }
+ mp := &traceMergePlan{
+ s: s,
+ }
+
+ for i := range u.metadata {
+ var orderByTag string
+ if u.criteria.OrderBy != nil &&
u.criteria.OrderBy.IndexRuleName != "" {
+ ok, indexRule :=
ss[i].IndexRuleDefined(u.criteria.OrderBy.IndexRuleName)
+ if !ok {
+ return nil, fmt.Errorf("index rule %s not found
in schema %d", u.criteria.OrderBy.IndexRuleName, i)
+ }
+ tags := indexRule.GetTags()
+ if len(tags) > 0 {
+ orderByTag = tags[len(tags)-1]
+ }
+ }
+ subPlan := parseTraceTags(u.criteria, u.metadata[i], u.ecc[i],
u.tagProjection, u.traceIDTagNames[i], orderByTag, i)
+ sp, err := subPlan.Analyze(ss[i])
+ if err != nil {
+ return nil, err
+ }
+ mp.subPlans = append(mp.subPlans, sp)
+ }
+
+ if u.criteria.OrderBy == nil {
+ mp.sortByTraceID = true
+ return mp, nil
+ }
+ if u.criteria.OrderBy.IndexRuleName == "" {
+ mp.sortByTraceID = true
+ if u.criteria.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+ mp.desc = true
+ }
+ return mp, nil
+ }
+ mp.sortByTraceID = false
+ if u.criteria.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+ mp.desc = true
+ }
+ return mp, nil
+}
+
+var (
+ _ logical.Plan = (*traceMergePlan)(nil)
+ _ executor.TraceExecutable = (*traceMergePlan)(nil)
+)
+
+type traceMergePlan struct {
+ s logical.Schema
+ subPlans []logical.Plan
+ sortByTraceID bool
+ desc bool
+}
+
+func (t *traceMergePlan) Close() {
+ for _, sp := range t.subPlans {
+ sp.(executor.TraceExecutable).Close()
+ }
+}
+
+func (t *traceMergePlan) Execute(ctx context.Context)
(iter.Iterator[model.TraceResult], error) {
+ var allErr error
+ var iters []sort.Iterator[*comparableTraceResult]
+
+ for _, sp := range t.subPlans {
+ resultIter, err := sp.(executor.TraceExecutable).Execute(ctx)
+ if err != nil {
+ allErr = multierr.Append(allErr, err)
+ continue
+ }
+ iter := newSortableTraceResults(resultIter, t.sortByTraceID)
+ iters = append(iters, iter)
+ }
+ if allErr != nil {
+ return iter.Empty[model.TraceResult](), allErr
+ }
+
+ sortedIter := sort.NewItemIter(iters, t.desc)
+ return &mergedTraceResultIterator{
+ Iterator: sortedIter,
+ }, nil
+}
+
+func (t *traceMergePlan) Children() []logical.Plan {
+ return t.subPlans
+}
+
+func (t *traceMergePlan) Schema() logical.Schema {
+ return t.s
+}
+
+func (t *traceMergePlan) String() string {
+ return fmt.Sprintf("TraceMergePlan: subPlans=%d, sortByTraceID=%t,
desc=%t",
+ len(t.subPlans), t.sortByTraceID, t.desc)
+}
+
+type comparableTraceResult struct {
+ sortedField []byte
+ result model.TraceResult
+ groupIndex int
+}
+
+func newComparableTraceResult(result model.TraceResult, sortByTraceID bool,
groupIndex int) *comparableTraceResult {
+ ct := &comparableTraceResult{
+ result: result,
+ groupIndex: groupIndex,
+ }
+ if sortByTraceID {
+ ct.sortedField = []byte(result.TID)
+ } else {
+ ct.sortedField = convert.Int64ToBytes(result.Key)
+ }
+ return ct
+}
+
+func (c *comparableTraceResult) SortedField() []byte {
+ return c.sortedField
+}
+
+type sortableTraceResults struct {
+ iter iter.Iterator[model.TraceResult]
+ current *comparableTraceResult
+ sortByTraceID bool
+}
+
+func newSortableTraceResults(iter iter.Iterator[model.TraceResult],
sortByTraceID bool) *sortableTraceResults {
+ return &sortableTraceResults{
+ iter: iter,
+ sortByTraceID: sortByTraceID,
+ }
+}
+
+func (s *sortableTraceResults) Next() bool {
+ result, hasNext := s.iter.Next()
+ if !hasNext {
+ return false
+ }
+ if result.Error != nil {
+ return false
+ }
+
+ s.current = newComparableTraceResult(result, s.sortByTraceID,
result.GroupIndex)
+ return true
+}
+
+func (s *sortableTraceResults) Val() *comparableTraceResult {
+ return s.current
+}
+
+func (s *sortableTraceResults) Close() error {
+ return nil
+}
+
+type mergedTraceResultIterator struct {
+ Iterator sort.Iterator[*comparableTraceResult]
+}
+
+func (s *mergedTraceResultIterator) Next() (model.TraceResult, bool) {
+ if !s.Iterator.Next() {
+ return model.TraceResult{}, false
+ }
+
+ ct := s.Iterator.Val()
+ if ct == nil {
+ return model.TraceResult{}, false
+ }
+ return ct.result, true
}
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index 270d795d..c4a207ce 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -46,6 +46,7 @@ type unresolvedTraceTagFilter struct {
traceIDTagName string
orderByTag string
projectionTags [][]*logical.Tag
+ groupIndex int
}
func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan,
error) {
@@ -143,6 +144,7 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx
*traceAnalyzeContext
traceIDs: traceIDs,
minVal: minVal,
maxVal: maxVal,
+ groupIndex: uis.groupIndex,
}
}
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index e13d0963..707872bd 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -354,11 +354,12 @@ func (t *TraceQueryOptions) CopyFrom(other
*TraceQueryOptions) {
// TraceResult is the result of a query.
type TraceResult struct {
- Error error
- TID string
- Spans [][]byte
- Tags []Tag
- Key int64
+ Error error
+ TID string
+ Spans [][]byte
+ Tags []Tag
+ Key int64
+ GroupIndex int
}
// TraceQueryResult is the result of a trace query.
diff --git a/pkg/test/trace/testdata/groups/test-trace-updated.json
b/pkg/test/trace/testdata/groups/test-trace-updated.json
new file mode 100644
index 00000000..b6fa7648
--- /dev/null
+++ b/pkg/test/trace/testdata/groups/test-trace-updated.json
@@ -0,0 +1,19 @@
+{
+ "metadata": {
+ "name": "test-trace-updated"
+ },
+ "catalog": "CATALOG_TRACE",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 0,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ }
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rule_bindings/sw_updated.json
b/pkg/test/trace/testdata/index_rule_bindings/sw_updated.json
new file mode 100644
index 00000000..c611a490
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rule_bindings/sw_updated.json
@@ -0,0 +1,17 @@
+{
+ "metadata": {
+ "name": "sw-updated-index-rule-binding",
+ "group": "test-trace-updated"
+ },
+ "rules": [
+ "duration",
+ "timestamp"
+ ],
+ "subject": {
+ "catalog": "CATALOG_TRACE",
+ "name": "sw"
+ },
+ "begin_at": "2021-04-15T01:30:15.01Z",
+ "expire_at": "2121-04-15T01:30:15.01Z",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/duration_updated.json
b/pkg/test/trace/testdata/index_rules/duration_updated.json
new file mode 100644
index 00000000..d31796ad
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/duration_updated.json
@@ -0,0 +1,14 @@
+{
+ "metadata": {
+ "name": "duration",
+ "group": "test-trace-updated"
+ },
+ "tags": [
+ "service_id",
+ "service_instance_id",
+ "state",
+ "duration"
+ ],
+ "type": "TYPE_TREE",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/timestamp_updated.json
b/pkg/test/trace/testdata/index_rules/timestamp_updated.json
new file mode 100644
index 00000000..2b947fa3
--- /dev/null
+++ b/pkg/test/trace/testdata/index_rules/timestamp_updated.json
@@ -0,0 +1,14 @@
+{
+ "metadata": {
+ "name": "timestamp",
+ "group": "test-trace-updated"
+ },
+ "tags": [
+ "service_id",
+ "service_instance_id",
+ "state",
+ "timestamp"
+ ],
+ "type": "TYPE_TREE",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/sw_updated.json
b/pkg/test/trace/testdata/traces/sw_updated.json
new file mode 100644
index 00000000..78c56e63
--- /dev/null
+++ b/pkg/test/trace/testdata/traces/sw_updated.json
@@ -0,0 +1,43 @@
+{
+ "metadata": {
+ "name": "sw",
+ "group": "test-trace-updated"
+ },
+ "tags": [
+ {
+ "name": "trace_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "state",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_instance_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "endpoint_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "duration",
+ "type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "error_message",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "timestamp",
+ "type": "TAG_TYPE_TIMESTAMP"
+ }
+ ],
+ "trace_id_tag_name": "trace_id",
+ "timestamp_tag_name": "timestamp",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/test/cases/init.go b/test/cases/init.go
index 9c4bb645..fa6d96fb 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -65,6 +65,7 @@ func Initialize(addr string, now time.Time) {
interval = 500 * time.Millisecond
casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now,
interval)
casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now,
interval)
+ casestrace.WriteToGroup(conn, "sw", "test-trace-updated", "sw_updated",
now.Add(time.Minute), interval)
time.Sleep(5 * time.Second)
casestrace.WriteToGroup(conn, "sw", "test-trace-group",
"sw_mixed_traces", now.Add(time.Minute), interval)
}
diff --git a/test/cases/trace/data/input/multi_group_new_tag.yml
b/test/cases/trace/data/input/multi_group_new_tag.yml
new file mode 100644
index 00000000..65d45b59
--- /dev/null
+++ b/test/cases/trace/data/input/multi_group_new_tag.yml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group", "test-trace-updated"]
+tag_projection: ["trace_id", "service_id", "error_message"]
+criteria:
+ condition:
+ name: "trace_id"
+ op: "BINARY_OP_IN"
+ value:
+ str_array:
+ value: ["trace_001", "trace_002", "trace_009", "trace_010"]
diff --git a/test/cases/trace/data/input/multi_group_sort_duration.yml
b/test/cases/trace/data/input/multi_group_sort_duration.yml
new file mode 100644
index 00000000..a2c8f097
--- /dev/null
+++ b/test/cases/trace/data/input/multi_group_sort_duration.yml
@@ -0,0 +1,31 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group", "test-trace-updated"]
+tag_projection: ["trace_id", "duration"]
+criteria:
+ condition:
+ name: "duration"
+ op: "BINARY_OP_GE"
+ value:
+ int:
+ value: "500"
+orderBy:
+ indexRuleName: "duration"
+ sort: "SORT_DESC"
+limit: 3
diff --git a/test/cases/trace/data/input/multi_group_tag_type.yml
b/test/cases/trace/data/input/multi_group_tag_type.yml
new file mode 100644
index 00000000..6f554d26
--- /dev/null
+++ b/test/cases/trace/data/input/multi_group_tag_type.yml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group", "test-trace-updated"]
+tag_projection: ["trace_id", "service_id", "state"]
+criteria:
+ condition:
+ name: "trace_id"
+ op: "BINARY_OP_IN"
+ value:
+ str_array:
+ value: ["trace_001", "trace_002", "trace_009", "trace_010"]
diff --git a/test/cases/trace/data/input/multi_group_unchanged.yml
b/test/cases/trace/data/input/multi_group_unchanged.yml
new file mode 100644
index 00000000..6adb4698
--- /dev/null
+++ b/test/cases/trace/data/input/multi_group_unchanged.yml
@@ -0,0 +1,27 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group", "test-trace-updated"]
+tag_projection: ["trace_id", "service_id"]
+criteria:
+ condition:
+ name: "trace_id"
+ op: "BINARY_OP_IN"
+ value:
+ str_array:
+ value: ["trace_001", "trace_002", "trace_009", "trace_010"]
diff --git a/test/cases/trace/data/testdata/sw_updated.json
b/test/cases/trace/data/testdata/sw_updated.json
new file mode 100644
index 00000000..b4ce7da9
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw_updated.json
@@ -0,0 +1,122 @@
+[
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "trace_009"
+ }
+ },
+ {
+ "str": {
+ "value": "success"
+ }
+ },
+ {
+ "str": {
+ "value": "api_service"
+ }
+ },
+ {
+ "str": {
+ "value": "api_instance_1"
+ }
+ },
+ {
+ "str": {
+ "value": "/api_endpoint"
+ }
+ },
+ {
+ "int": {
+ "value": 1234
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ }
+ ],
+ "span": "trace_009_span_1"
+ },
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "trace_009"
+ }
+ },
+ {
+ "str": {
+ "value": "success"
+ }
+ },
+ {
+ "str": {
+ "value": "api_service"
+ }
+ },
+ {
+ "str": {
+ "value": "api_instance_2"
+ }
+ },
+ {
+ "str": {
+ "value": "/data_endpoint"
+ }
+ },
+ {
+ "int": {
+ "value": 500
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ }
+ ],
+ "span": "trace_009_span_2"
+ },
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "trace_010"
+ }
+ },
+ {
+ "str": {
+ "value": "error"
+ }
+ },
+ {
+ "str": {
+ "value": "api_service"
+ }
+ },
+ {
+ "str": {
+ "value": "api_instance_3"
+ }
+ },
+ {
+ "str": {
+ "value": "/api_endpoint"
+ }
+ },
+ {
+ "int": {
+ "value": 91011
+ }
+ },
+ {
+ "str": {
+ "value": "Connection timeout"
+ }
+ }
+ ],
+ "span": "trace_010_span_1"
+ }
+]
\ No newline at end of file
diff --git a/test/cases/trace/data/want/multi_group_new_tag.yml
b/test/cases/trace/data/want/multi_group_new_tag.yml
new file mode 100644
index 00000000..b9dfda6c
--- /dev/null
+++ b/test/cases/trace/data/want/multi_group_new_tag.yml
@@ -0,0 +1,179 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: trace_001_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_3
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_4
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_5
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - spans:
+ - span: trace_002_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_3
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: auth_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_4
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: auth_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - spans:
+ - span: trace_009_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - span: trace_009_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: error_message
+ value:
+ "null": null
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - spans:
+ - span: trace_010_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: error_message
+ value:
+ str:
+ value: Connection timeout
+ - key: trace_id
+ value:
+ str:
+ value: trace_010
diff --git a/test/cases/trace/data/want/multi_group_sort_duration.yml
b/test/cases/trace/data/want/multi_group_sort_duration.yml
new file mode 100644
index 00000000..33687ebe
--- /dev/null
+++ b/test/cases/trace/data/want/multi_group_sort_duration.yml
@@ -0,0 +1,81 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: trace_010_span_1
+ tags:
+ - key: duration
+ value:
+ int:
+ value: "91011"
+ - key: trace_id
+ value:
+ str:
+ value: trace_010
+ - spans:
+ - span: trace_009_span_1
+ tags:
+ - key: duration
+ value:
+ int:
+ value: "1234"
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - span: trace_009_span_2
+ tags:
+ - key: duration
+ value:
+ int:
+ value: "500"
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - spans:
+ - span: trace_003_span_1
+ tags:
+ - key: duration
+ value:
+ int:
+ value: "1200"
+ - key: trace_id
+ value:
+ str:
+ value: trace_003
+ - span: trace_003_span_2
+ tags:
+ - key: duration
+ value:
+ int:
+ value: "150"
+ - key: trace_id
+ value:
+ str:
+ value: trace_003
+ - span: trace_003_span_3
+ tags:
+ - key: duration
+ value:
+ int:
+ value: "400"
+ - key: trace_id
+ value:
+ str:
+ value: trace_003
diff --git a/test/cases/trace/data/want/multi_group_tag_type.yml
b/test/cases/trace/data/want/multi_group_tag_type.yml
new file mode 100644
index 00000000..dc574547
--- /dev/null
+++ b/test/cases/trace/data/want/multi_group_tag_type.yml
@@ -0,0 +1,190 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: trace_001_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: state
+ value:
+ int:
+ value: "1"
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: state
+ value:
+ int:
+ value: "0"
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_3
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: state
+ value:
+ int:
+ value: "0"
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_4
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: state
+ value:
+ int:
+ value: "1"
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_5
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: state
+ value:
+ int:
+ value: "0"
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - spans:
+ - span: trace_002_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: state
+ value:
+ int:
+ value: "1"
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: state
+ value:
+ int:
+ value: "0"
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_3
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: auth_service
+ - key: state
+ value:
+ int:
+ value: "1"
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_4
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: auth_service
+ - key: state
+ value:
+ int:
+ value: "0"
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - spans:
+ - span: trace_009_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: state
+ value:
+ str:
+ value: success
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - span: trace_009_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: state
+ value:
+ str:
+ value: success
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - spans:
+ - span: trace_010_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: state
+ value:
+ str:
+ value: error
+ - key: trace_id
+ value:
+ str:
+ value: trace_010
diff --git a/test/cases/trace/data/want/multi_group_unchanged.yml
b/test/cases/trace/data/want/multi_group_unchanged.yml
new file mode 100644
index 00000000..7e92d342
--- /dev/null
+++ b/test/cases/trace/data/want/multi_group_unchanged.yml
@@ -0,0 +1,142 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: trace_001_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_3
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_4
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - span: trace_001_span_5
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_001
+ - spans:
+ - span: trace_002_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: webapp_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_3
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: auth_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - span: trace_002_span_4
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: auth_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_002
+ - spans:
+ - span: trace_009_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - span: trace_009_span_2
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_009
+ - spans:
+ - span: trace_010_span_1
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: api_service
+ - key: trace_id
+ value:
+ str:
+ value: trace_010
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index a9d91b05..f4db8137 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -54,4 +54,8 @@ var _ = g.DescribeTable("Scanning Traces", func(args
helpers.Args) {
g.Entry("filter by query", helpers.Args{Input: "having_query_tag",
Duration: 1 * time.Hour}),
g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 *
time.Hour, WantErr: true}),
g.Entry("filter by query with having condition", helpers.Args{Input:
"having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}),
+ g.Entry("multi-groups: unchanged tags", helpers.Args{Input:
"multi_group_unchanged", Duration: 1 * time.Hour}),
+ g.Entry("multi-groups: new tag", helpers.Args{Input:
"multi_group_new_tag", Duration: 1 * time.Hour}),
+ g.Entry("multi-groups: tag type change", helpers.Args{Input:
"multi_group_tag_type", Duration: 1 * time.Hour}),
+ g.Entry("multi-groups: sort by duration", helpers.Args{Input:
"multi_group_sort_duration", Duration: 1 * time.Hour}),
)