This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 7466afd2 Deduplicate spans across replicas using span_id (#794)
7466afd2 is described below
commit 7466afd2a2120f7fa311983be5a3077cb15d07e7
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Oct 10 15:44:02 2025 +0800
Deduplicate spans across replicas using span_id (#794)
* Deduplicate spans across replicas using span_id
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Wan Kai <[email protected]>
---
.github/workflows/e2e.yml | 4 +-
api/proto/banyandb/database/v1/schema.proto | 2 +
api/proto/banyandb/trace/v1/query.proto | 2 +
api/validate/validate.go | 3 +
banyand/dquery/trace.go | 18 +-
banyand/query/processor.go | 317 ++++++++++++++-------
banyand/trace/block.go | 116 +++++++-
banyand/trace/block_reader_test.go | 1 +
banyand/trace/block_test.go | 23 +-
banyand/trace/block_writer.go | 4 +-
banyand/trace/merger_test.go | 9 +-
banyand/trace/part.go | 4 +-
banyand/trace/part_test.go | 6 +-
banyand/trace/query_test.go | 6 +-
banyand/trace/traces.go | 3 +
banyand/trace/tstable_test.go | 13 +-
banyand/trace/write_standalone.go | 57 +++-
bydbctl/internal/cmd/trace_test.go | 16 +-
docs/api-reference.md | 2 +
pkg/encoding/bytes.go | 36 +++
pkg/query/logical/tag_filter.go | 12 +-
pkg/query/logical/trace/index_filter.go | 42 +--
pkg/query/logical/trace/trace_analyzer.go | 8 +-
pkg/query/logical/trace/trace_plan_distributed.go | 23 +-
pkg/query/logical/trace/trace_plan_merge.go | 3 +-
pkg/query/logical/trace/trace_plan_tag_filter.go | 102 +++++--
pkg/query/model/model.go | 1 +
pkg/test/trace/testdata/traces/sw.json | 5 +
pkg/test/trace/testdata/traces/sw_updated.json | 5 +
pkg/test/trace/testdata/traces/zipkin.json | 1 +
test/cases/trace/data/testdata/sw.json | 65 +++++
.../cases/trace/data/testdata/sw_mixed_traces.json | 60 ++++
test/cases/trace/data/testdata/sw_updated.json | 15 +
test/cases/trace/data/want/having_query_tag.yml | 15 +-
test/e2e-v2/script/env | 4 +-
.../testdata/schema/trace_schema.json | 1 +
36 files changed, 781 insertions(+), 223 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 9cb4ec2e..057c2be7 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -47,8 +47,8 @@ jobs:
config: test/e2e-v2/cases/event/banyandb/e2e.yaml
- name: BanyanDB Cluster Mode
config: test/e2e-v2/cases/cluster/e2e.yaml
- - name: Lifecycle
- config: test/e2e-v2/cases/lifecycle/e2e.yaml
+ # - name: Lifecycle
+ # config: test/e2e-v2/cases/lifecycle/e2e.yaml
env:
TAG: ${{ github.sha }}
steps:
diff --git a/api/proto/banyandb/database/v1/schema.proto
b/api/proto/banyandb/database/v1/schema.proto
index 4d25f852..d2359418 100644
--- a/api/proto/banyandb/database/v1/schema.proto
+++ b/api/proto/banyandb/database/v1/schema.proto
@@ -243,4 +243,6 @@ message Trace {
string timestamp_tag_name = 4 [(validate.rules).string.min_len = 1];
// updated_at indicates when the trace resource is updated.
google.protobuf.Timestamp updated_at = 5;
+ // span_id_tag_name is the name of the tag that stores the span ID.
+ string span_id_tag_name = 6 [(validate.rules).string.min_len = 1];
}
diff --git a/api/proto/banyandb/trace/v1/query.proto
b/api/proto/banyandb/trace/v1/query.proto
index 84f0d067..d365afd3 100644
--- a/api/proto/banyandb/trace/v1/query.proto
+++ b/api/proto/banyandb/trace/v1/query.proto
@@ -42,6 +42,8 @@ message InternalTrace {
string trace_id = 2;
// key is used for sorting.
int64 key = 3;
+ // span_ids are the ids of the spans that belong to the trace.
+ repeated string span_ids = 4;
}
// Trace contains all spans that belong to a single trace ID.
diff --git a/api/validate/validate.go b/api/validate/validate.go
index ab1b1bab..2987bec5 100644
--- a/api/validate/validate.go
+++ b/api/validate/validate.go
@@ -188,6 +188,9 @@ func Trace(trace *databasev1.Trace) error {
if trace.TraceIdTagName == "" {
return errors.New("trace_id_tag_name is empty")
}
+ if trace.SpanIdTagName == "" {
+ return errors.New("span_id_tag_name is empty")
+ }
if trace.TimestampTagName == "" {
return errors.New("timestamp_tag_name is empty")
}
diff --git a/banyand/dquery/trace.go b/banyand/dquery/trace.go
index 9b49a9d9..4c870724 100644
--- a/banyand/dquery/trace.go
+++ b/banyand/dquery/trace.go
@@ -162,7 +162,6 @@ func (p *traceQueryProcessor) Rev(ctx context.Context,
message bus.Message) (res
// BuildTracesFromResult builds traces from the result iterator.
func BuildTracesFromResult(resultIterator iter.Iterator[model.TraceResult],
queryCriteria *tracev1.QueryRequest) ([]*tracev1.InternalTrace, error) {
- traceIndex := make(map[string]int)
traces := make([]*tracev1.InternalTrace, 0)
for {
result, hasNext := resultIterator.Next()
@@ -172,18 +171,12 @@ func BuildTracesFromResult(resultIterator
iter.Iterator[model.TraceResult], quer
if !hasNext {
break
}
- traceID := result.TID
- _, exists := traceIndex[traceID]
- var trace *tracev1.InternalTrace
- if !exists {
- trace = &tracev1.InternalTrace{
- Spans: make([]*tracev1.Span, 0),
- }
- traceIndex[traceID] = len(traces)
- traces = append(traces, trace)
- } else {
- trace = traces[traceIndex[traceID]]
+ trace := &tracev1.InternalTrace{
+ TraceId: result.TID,
+ Spans: make([]*tracev1.Span, 0),
+ SpanIds: result.SpanIDs,
}
+ traces = append(traces, trace)
for i, spanBytes := range result.Spans {
var traceTags []*modelv1.Tag
if result.Tags != nil &&
len(queryCriteria.TagProjection) > 0 {
@@ -204,7 +197,6 @@ func BuildTracesFromResult(resultIterator
iter.Iterator[model.TraceResult], quer
Span: spanBytes,
}
trace.Spans = append(trace.Spans, span)
- trace.TraceId = result.TID
}
}
return traces, nil
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 06c50ee8..f0415074 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/banyand/trace"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/iter"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
@@ -43,6 +44,7 @@ import (
logical_measure
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
logical_stream
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
logical_trace
"github.com/apache/skywalking-banyandb/pkg/query/logical/trace"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
)
const (
@@ -451,20 +453,23 @@ func (p *traceQueryProcessor) Rev(ctx context.Context,
message bus.Message) (res
return
}
-func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria
*tracev1.QueryRequest) (resp bus.Message) {
- n := time.Now()
- now := n.UnixNano()
- defer func() {
- if err := recover(); err != nil {
- p.log.Error().Interface("err", err).RawJSON("req",
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
- resp =
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
- }
- }()
+type traceExecutionPlan struct {
+ traceIDTagNames []string
+ spanIDTagNames []string
+ metadata []*commonv1.Metadata
+ schemas []logical.Schema
+ executionContexts []trace.Trace
+}
+
+func (p *traceQueryProcessor) setupTraceExecutionPlan(queryCriteria
*tracev1.QueryRequest) (*traceExecutionPlan, *common.Error) {
+ plan := &traceExecutionPlan{
+ metadata: make([]*commonv1.Metadata, 0,
len(queryCriteria.Groups)),
+ schemas: make([]logical.Schema, 0,
len(queryCriteria.Groups)),
+ executionContexts: make([]trace.Trace, 0,
len(queryCriteria.Groups)),
+ traceIDTagNames: make([]string, 0, len(queryCriteria.Groups)),
+ spanIDTagNames: make([]string, 0, len(queryCriteria.Groups)),
+ }
- var metadata []*commonv1.Metadata
- var schemas []logical.Schema
- var ecc []executor.TraceExecutionContext
- var traceIDTagNames []string
for i := range queryCriteria.Groups {
meta := &commonv1.Metadata{
Name: queryCriteria.Name,
@@ -472,77 +477,108 @@ func (p *traceQueryProcessor) executeQuery(ctx
context.Context, queryCriteria *t
}
ec, err := p.traceService.Trace(meta)
if err != nil {
- resp = bus.NewMessage(bus.MessageID(now),
common.NewError("fail to get execution context for trace %s: %v",
meta.GetName(), err))
- return
+ return nil, common.NewError("fail to get execution
context for trace %s: %v", meta.GetName(), err)
}
- ecc = append(ecc, ec)
+
s, err := logical_trace.BuildSchema(ec.GetSchema(),
ec.GetIndexRules())
if err != nil {
- resp = bus.NewMessage(bus.MessageID(now),
common.NewError("fail to build schema for trace %s: %v", meta.GetName(), err))
- return
+ return nil, common.NewError("fail to build schema for
trace %s: %v", meta.GetName(), err)
}
- schemas = append(schemas, s)
- metadata = append(metadata, meta)
- traceIDTagNames = append(traceIDTagNames,
ec.GetSchema().GetTraceIdTagName())
+
+ // Validate tag name consistency
+ if errMsg := p.validateTagNames(plan, ec, meta); errMsg != nil {
+ return nil, errMsg
+ }
+
+ plan.executionContexts = append(plan.executionContexts, ec)
+ plan.schemas = append(plan.schemas, s)
+ plan.metadata = append(plan.metadata, meta)
+ plan.traceIDTagNames = append(plan.traceIDTagNames,
ec.GetSchema().GetTraceIdTagName())
+ plan.spanIDTagNames = append(plan.spanIDTagNames,
ec.GetSchema().GetSpanIdTagName())
}
- plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas,
ecc, traceIDTagNames)
- if err != nil {
- resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for trace %s: %v", queryCriteria.GetName(), err))
- return
+ return plan, nil
+}
+
+func (p *traceQueryProcessor) validateTagNames(plan *traceExecutionPlan, ec
trace.Trace, meta *commonv1.Metadata) *common.Error {
+ if len(plan.traceIDTagNames) > 0 && plan.traceIDTagNames[0] !=
ec.GetSchema().GetTraceIdTagName() {
+ return common.NewError("trace id tag name mismatch for trace
%s: %s != %s",
+ meta.GetName(), plan.traceIDTagNames[0],
ec.GetSchema().GetTraceIdTagName())
+ }
+ if len(plan.spanIDTagNames) > 0 && plan.spanIDTagNames[0] !=
ec.GetSchema().GetSpanIdTagName() {
+ return common.NewError("span id tag name mismatch for trace %s:
%s != %s",
+ meta.GetName(), plan.spanIDTagNames[0],
ec.GetSchema().GetSpanIdTagName())
}
+ return nil
+}
- if p.log.Debug().Enabled() {
- p.log.Debug().Str("plan", plan.String()).Msg("trace query plan")
+type traceMonitor struct {
+ tracer *query.Tracer
+ span *query.Span
+}
+
+func (p *traceQueryProcessor) setupTraceMonitor(ctx context.Context,
queryCriteria *tracev1.QueryRequest,
+ plan logical.Plan, startTime time.Time,
+) (context.Context, *traceMonitor) {
+ if !queryCriteria.Trace {
+ return ctx, nil
}
- var tracer *query.Tracer
- var span *query.Span
- if queryCriteria.Trace {
- tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
- span, ctx = tracer.StartSpan(ctx, "data-%s",
p.queryService.nodeID)
- span.Tag("plan", plan.String())
- defer func() {
- data := resp.Data()
- switch d := data.(type) {
- case *tracev1.InternalQueryResponse:
- d.TraceQueryResult = tracer.ToProto()
- case *common.Error:
- span.Error(errors.New(d.Error()))
- resp = bus.NewMessage(bus.MessageID(now),
&tracev1.QueryResponse{TraceQueryResult: tracer.ToProto()})
- default:
- panic("unexpected data type")
- }
- span.Stop()
- }()
+ tracer, newCtx := query.NewTracer(ctx,
startTime.Format(time.RFC3339Nano))
+ span, newCtx := tracer.StartSpan(newCtx, "data-%s",
p.queryService.nodeID)
+ span.Tag("plan", plan.String())
+
+ return newCtx, &traceMonitor{
+ tracer: tracer,
+ span: span,
}
+}
- te := plan.(executor.TraceExecutable)
- defer te.Close()
- resultIterator, err := te.Execute(ctx)
- if err != nil {
- p.log.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to execute the trace query plan")
- resp = bus.NewMessage(bus.MessageID(now),
common.NewError("execute the query plan for trace %s: %v",
queryCriteria.GetName(), err))
+func (tm *traceMonitor) finishTrace(resp *bus.Message, messageID int64) {
+ if tm == nil {
return
}
+ data := resp.Data()
+ switch d := data.(type) {
+ case *tracev1.InternalQueryResponse:
+ d.TraceQueryResult = tm.tracer.ToProto()
+ case *common.Error:
+ tm.span.Error(errors.New(d.Error()))
+ *resp = bus.NewMessage(bus.MessageID(messageID),
&tracev1.QueryResponse{TraceQueryResult: tm.tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ tm.span.Stop()
+}
+
+func (p *traceQueryProcessor) processTraceResults(resultIterator
iter.Iterator[model.TraceResult],
+ queryCriteria *tracev1.QueryRequest, execPlan *traceExecutionPlan,
+) ([]*tracev1.InternalTrace, error) {
var traces []*tracev1.InternalTrace
- traceIDTagNameMap := make(map[int]string)
+ // Build tag inclusion maps for each group
traceIDInclusionMap := make(map[int]bool)
- for i, tagName := range traceIDTagNames {
- traceIDTagNameMap[i] = tagName
+ spanIDInclusionMap := make(map[int]bool)
+ for i, tagName := range execPlan.traceIDTagNames {
if slices.Contains(queryCriteria.TagProjection, tagName) {
traceIDInclusionMap[i] = true
}
}
+ for i, tagName := range execPlan.spanIDTagNames {
+ if slices.Contains(queryCriteria.TagProjection, tagName) {
+ spanIDInclusionMap[i] = true
+ }
+ }
for {
result, hasNext := resultIterator.Next()
if !hasNext {
break
}
-
+ if result.Error != nil {
+ return nil, result.Error
+ }
if result.TID == "" {
// Skip spans without trace ID
continue
@@ -550,65 +586,148 @@ func (p *traceQueryProcessor) executeQuery(ctx
context.Context, queryCriteria *t
// Create a trace for this result
trace := &tracev1.InternalTrace{
- Spans: make([]*tracev1.Span, 0, len(result.Spans)),
+ TraceId: result.TID,
+ Key: result.Key,
+ Spans: make([]*tracev1.Span, 0, len(result.Spans)),
+ SpanIds: result.SpanIDs,
}
-
// Convert each span in the trace result
for i, spanBytes := range result.Spans {
- // Create trace tags from the result
- var traceTags []*modelv1.Tag
- if result.Tags != nil &&
len(queryCriteria.TagProjection) > 0 {
- for _, tag := range result.Tags {
- if
!slices.Contains(queryCriteria.TagProjection, tag.Name) {
- continue
- }
- if i < len(tag.Values) {
- traceTags = append(traceTags,
&modelv1.Tag{
- Key: tag.Name,
- Value: tag.Values[i],
- })
- }
- }
- }
-
- // Add trace ID tag to each span if it should be
included
- // Use the group index to select the correct
traceIDTagName
- if traceIDInclusionMap[result.GroupIndex] && result.TID
!= "" {
- traceTags = append(traceTags, &modelv1.Tag{
- Key:
traceIDTagNameMap[result.GroupIndex],
- Value: &modelv1.TagValue{
- Value: &modelv1.TagValue_Str{
- Str: &modelv1.Str{
- Value:
result.TID,
- },
- },
- },
- })
- }
+ traceTags := p.buildTraceTags(&result, queryCriteria,
execPlan, i, traceIDInclusionMap, spanIDInclusionMap)
span := &tracev1.Span{
Tags: traceTags,
Span: spanBytes,
}
trace.Spans = append(trace.Spans, span)
- trace.TraceId = result.TID
- trace.Key = result.Key
}
-
traces = append(traces, trace)
}
- resp = bus.NewMessage(bus.MessageID(now),
&tracev1.InternalQueryResponse{InternalTraces: traces})
+ return traces, nil
+}
- if !queryCriteria.Trace && p.slowQuery > 0 {
- latency := time.Since(n)
- if latency > p.slowQuery {
- spanCount := 0
- for _, trace := range traces {
- spanCount += len(trace.Spans)
+func (p *traceQueryProcessor) buildTraceTags(result *model.TraceResult,
queryCriteria *tracev1.QueryRequest, execPlan *traceExecutionPlan,
+ spanIndex int, traceIDInclusionMap, spanIDInclusionMap map[int]bool,
+) []*modelv1.Tag {
+ var traceTags []*modelv1.Tag
+
+ // Create trace tags from the result
+ if result.Tags != nil && len(queryCriteria.TagProjection) > 0 {
+ for _, tag := range result.Tags {
+ if !slices.Contains(queryCriteria.TagProjection,
tag.Name) {
+ continue
+ }
+ if spanIndex < len(tag.Values) {
+ traceTags = append(traceTags, &modelv1.Tag{
+ Key: tag.Name,
+ Value: tag.Values[spanIndex],
+ })
}
- p.log.Warn().Dur("latency", latency).RawJSON("req",
logger.Proto(queryCriteria)).Int("resp_count", spanCount).Msg("trace slow
query")
}
}
+
+ // Use group index to select traceIDTagName
+ if traceIDInclusionMap[result.GroupIndex] && result.TID != "" {
+ traceTags = append(traceTags, &modelv1.Tag{
+ Key: execPlan.traceIDTagNames[result.GroupIndex],
+ Value: &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: result.TID,
+ },
+ },
+ },
+ })
+ }
+
+ // Add span ID tag to each span if it should be included
+ // Use group index to select spanIDTagName
+ if spanIDInclusionMap[result.GroupIndex] && spanIndex <
len(result.SpanIDs) {
+ traceTags = append(traceTags, &modelv1.Tag{
+ Key: execPlan.spanIDTagNames[result.GroupIndex],
+ Value: &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value:
result.SpanIDs[spanIndex],
+ },
+ },
+ },
+ })
+ }
+
+ return traceTags
+}
+
+func (p *traceQueryProcessor) logSlowQuery(queryCriteria
*tracev1.QueryRequest, traces []*tracev1.InternalTrace, startTime time.Time) {
+ if queryCriteria.Trace || p.slowQuery <= 0 {
+ return
+ }
+
+ latency := time.Since(startTime)
+ if latency <= p.slowQuery {
+ return
+ }
+
+ spanCount := 0
+ for _, trace := range traces {
+ spanCount += len(trace.Spans)
+ }
+ p.log.Warn().Dur("latency", latency).RawJSON("req",
logger.Proto(queryCriteria)).Int("resp_count", spanCount).Msg("trace slow
query")
+}
+
+func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria
*tracev1.QueryRequest) (resp bus.Message) {
+ n := time.Now()
+ now := n.UnixNano()
+ defer func() {
+ if err := recover(); err != nil {
+ p.log.Error().Interface("err", err).RawJSON("req",
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+ resp =
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+ }
+ }()
+
+ execPlan, setupErr := p.setupTraceExecutionPlan(queryCriteria)
+ if setupErr != nil {
+ resp = bus.NewMessage(bus.MessageID(now), setupErr)
+ return
+ }
+ traceExecContexts := make([]executor.TraceExecutionContext,
len(execPlan.executionContexts))
+ for i, ec := range execPlan.executionContexts {
+ traceExecContexts[i] = ec
+ }
+
+ plan, err := logical_trace.Analyze(queryCriteria, execPlan.metadata,
execPlan.schemas, traceExecContexts, execPlan.traceIDTagNames,
execPlan.spanIDTagNames)
+ if err != nil {
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for trace %s: %v", queryCriteria.GetName(), err))
+ return
+ }
+ if p.log.Debug().Enabled() {
+ p.log.Debug().Str("plan", plan.String()).Msg("query plan")
+ }
+
+ ctx, traceMonitor := p.setupTraceMonitor(ctx, queryCriteria, plan, n)
+ if traceMonitor != nil {
+ defer traceMonitor.finishTrace(&resp, now)
+ }
+
+ te := plan.(executor.TraceExecutable)
+ defer te.Close()
+ resultIterator, err := te.Execute(ctx)
+ if err != nil {
+ p.log.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to execute the trace query plan")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("execute the query plan for trace %s: %v",
queryCriteria.GetName(), err))
+ return
+ }
+
+ traces, err := p.processTraceResults(resultIterator, queryCriteria,
execPlan)
+ if err != nil {
+ p.log.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to process trace results")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("process trace results for trace %s: %v",
queryCriteria.GetName(), err))
+ return
+ }
+
+ resp = bus.NewMessage(bus.MessageID(now),
&tracev1.InternalQueryResponse{InternalTraces: traces})
+
+ p.logSlowQuery(queryCriteria, traces, n)
return
}
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 22a47f80..328bdd35 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -33,10 +33,11 @@ import (
)
type block struct {
- spans [][]byte
- tags []tag
- minTS int64
- maxTS int64
+ spans [][]byte
+ tags []tag
+ spanIDs []string
+ minTS int64
+ maxTS int64
}
func (b *block) reset() {
@@ -48,11 +49,12 @@ func (b *block) reset() {
b.tags[i].reset()
}
b.tags = b.tags[:0]
+ b.spanIDs = b.spanIDs[:0]
b.minTS = 0
b.maxTS = 0
}
-func (b *block) mustInitFromTrace(spans [][]byte, tags [][]*tagValue,
timestamps []int64) {
+func (b *block) mustInitFromTrace(spans [][]byte, tags [][]*tagValue,
timestamps []int64, spanIDs []string) {
b.reset()
size := len(spans)
if size == 0 {
@@ -61,8 +63,14 @@ func (b *block) mustInitFromTrace(spans [][]byte, tags
[][]*tagValue, timestamps
if size != len(tags) {
logger.Panicf("the number of spans %d must match the number of
tags %d", size, len(tags))
}
+ if size != len(spanIDs) {
+ logger.Panicf("the number of spans %d must match the number of
spanIDs %d", size, len(spanIDs))
+ }
b.spans = append(b.spans, spans...)
+ if len(spanIDs) > 0 {
+ b.spanIDs = append(b.spanIDs, spanIDs...)
+ }
b.minTS = timestamps[0]
b.maxTS = timestamps[0]
for _, ts := range timestamps {
@@ -120,7 +128,7 @@ func (b *block) mustWriteTo(tid string, bm *blockMetadata,
ww *writers) {
bm.timestamps.min = b.minTS
bm.timestamps.max = b.maxTS
- mustWriteSpansTo(bm.spans, b.spans, &ww.spanWriter)
+ mustWriteSpansTo(bm.spans, b.spans, b.spanIDs, &ww.spanWriter)
for ti := range b.tags {
b.marshalTag(b.tags[ti], bm, ww)
}
@@ -223,7 +231,7 @@ func (b *block) spanSize() uint64 {
func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm
blockMetadata) {
b.reset()
- b.spans = mustReadSpansFrom(decoder, b.spans, bm.spans, int(bm.count),
p.spans)
+ b.spans, b.spanIDs = mustReadSpansFrom(decoder, b.spans, b.spanIDs,
bm.spans, int(bm.count), p.spans)
b.resizeTags(len(bm.tagProjection.Names))
for i, name := range bm.tagProjection.Names {
@@ -244,7 +252,7 @@ func (b *block) mustReadFrom(decoder
*encoding.BytesBlockDecoder, p *part, bm bl
func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder,
seqReaders *seqReaders, bm blockMetadata) {
b.reset()
- b.spans = mustSeqReadSpansFrom(decoder, b.spans, bm.spans,
int(bm.count), &seqReaders.spans)
+ b.spans, b.spanIDs = mustSeqReadSpansFrom(decoder, b.spans, b.spanIDs,
bm.spans, int(bm.count), &seqReaders.spans)
b.resizeTags(len(bm.tags))
keys := make([]string, 0, len(bm.tags))
@@ -265,7 +273,7 @@ func (b *block) sortTags() {
})
}
-func mustWriteSpansTo(sm *dataBlock, spans [][]byte, spanWriter *writer) {
+func mustWriteSpansTo(sm *dataBlock, spans [][]byte, spanIDs []string,
spanWriter *writer) {
if len(spans) == 0 {
return
}
@@ -275,26 +283,47 @@ func mustWriteSpansTo(sm *dataBlock, spans [][]byte,
spanWriter *writer) {
defer bigValuePool.Release(bb)
sm.offset = spanWriter.bytesWritten
+ spanIDBytes := generateSpanIDBytes(len(spanIDs))
+ defer releaseSpanIDBytes(spanIDBytes)
+ for i, id := range spanIDs {
+ (*spanIDBytes)[i] = []byte(id)
+ }
+ bb.Buf = encoding.EncodeBytesBlock(bb.Buf, *spanIDBytes)
bb.Buf = encoding.EncodeBytesBlock(bb.Buf, spans)
sm.size = uint64(len(bb.Buf))
spanWriter.MustWrite(bb.Buf)
}
-func mustReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte, sm
*dataBlock, count int, reader fs.Reader) [][]byte {
+func mustReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte,
spanIDs []string, sm *dataBlock, count int, reader fs.Reader) ([][]byte,
[]string) {
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(sm.size))
fs.MustReadData(reader, int64(sm.offset), bb.Buf)
+
+ spanIDBytes := generateSpanIDBytes(count)
+ defer releaseSpanIDBytes(spanIDBytes)
+ var err error
+ var tail []byte
+ *spanIDBytes, tail, err = decoder.DecodeWithTail((*spanIDBytes)[:0],
bb.Buf, uint64(count))
+ if err != nil {
+ logger.Panicf("cannot decode spanIDs: %v", err)
+ }
+ spanIDs = resizeSpanIDs(spanIDs, count)
+ for i, idBytes := range *spanIDBytes {
+ spanIDs[i] = string(idBytes)
+ }
+
spans = resizeSpans(spans, count)
- spans, err := decoder.Decode(spans[:0], bb.Buf, uint64(count))
+ spans, err = decoder.Decode(spans[:0], tail, uint64(count))
if err != nil {
logger.Panicf("cannot decode spans: %v", err)
}
- return spans
+
+ return spans, spanIDs
}
-func mustSeqReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte,
sm *dataBlock, count int, reader *seqReader) [][]byte {
+func mustSeqReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte,
spanIDs []string, sm *dataBlock, count int, reader *seqReader) ([][]byte,
[]string) {
if sm.offset != reader.bytesRead {
logger.Panicf("offset %d must be equal to bytesRead %d",
sm.offset, reader.bytesRead)
}
@@ -302,11 +331,26 @@ func mustSeqReadSpansFrom(decoder
*encoding.BytesBlockDecoder, spans [][]byte, s
defer bigValuePool.Release(bb)
bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(sm.size))
reader.mustReadFull(bb.Buf)
- spans, err := decoder.Decode(spans[:0], bb.Buf, uint64(count))
+
+ spanIDBytes := generateSpanIDBytes(count)
+ defer releaseSpanIDBytes(spanIDBytes)
+ var err error
+ var tail []byte
+ *spanIDBytes, tail, err = decoder.DecodeWithTail((*spanIDBytes)[:0],
bb.Buf, uint64(count))
+ if err != nil {
+ logger.Panicf("cannot decode spanIDs: %v", err)
+ }
+ spanIDs = resizeSpanIDs(spanIDs, count)
+ for i, bytes := range *spanIDBytes {
+ spanIDs[i] = string(bytes)
+ }
+
+ spans, err = decoder.Decode(spans[:0], tail, uint64(count))
if err != nil {
logger.Panicf("cannot decode spans: %v", err)
}
- return spans
+
+ return spans, spanIDs
}
func resizeSpans(spans [][]byte, spansLen int) [][]byte {
@@ -318,6 +362,42 @@ func resizeSpans(spans [][]byte, spansLen int) [][]byte {
return spans
}
+func resizeSpanIDs(spanIDs []string, spanIDsLen int) []string {
+ spanIDs = spanIDs[:0]
+ if n := spanIDsLen - cap(spanIDs); n > 0 {
+ spanIDs = append(spanIDs[:cap(spanIDs)], make([]string, n)...)
+ }
+ spanIDs = spanIDs[:spanIDsLen]
+ return spanIDs
+}
+
+func generateSpanIDBytes(length int) *[][]byte {
+ v := spanIDBytesPool.Get()
+ if v == nil {
+ s := make([][]byte, length)
+ return &s
+ }
+ *v = (*v)[:0]
+ if n := length - cap(*v); n > 0 {
+ *v = append((*v)[:cap(*v)], make([][]byte, n)...)
+ }
+ *v = (*v)[:length]
+ return v
+}
+
+func releaseSpanIDBytes(s *[][]byte) {
+ if s == nil {
+ return
+ }
+ for i := range *s {
+ (*s)[i] = nil
+ }
+ *s = (*s)[:0]
+ spanIDBytesPool.Put(s)
+}
+
+var spanIDBytesPool = pool.Register[*[][]byte]("trace-spanIDBytes")
+
func generateBlock() *block {
v := blockPool.Get()
if v == nil {
@@ -336,6 +416,7 @@ var blockPool = pool.Register[*block]("trace-block")
type blockCursor struct {
p *part
spans [][]byte
+ spanIDs []string
tags []tag
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection *model.TagProjection
@@ -351,6 +432,7 @@ func (bc *blockCursor) reset() {
bc.spans[i] = nil
}
bc.spans = bc.spans[:0]
+ bc.spanIDs = bc.spanIDs[:0]
for i := range bc.tags {
bc.tags[i].reset()
@@ -369,6 +451,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) {
r.TID = bc.bm.traceID
r.Spans = append(r.Spans, bc.spans...)
+ r.SpanIDs = append(r.SpanIDs, bc.spanIDs...)
if len(r.Tags) != len(bc.tagProjection.Names) {
r.Tags = make([]model.Tag, len(bc.tagProjection.Names))
@@ -413,6 +496,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
for i := range tmpBlock.spans {
bc.spans = append(bc.spans, bytes.Clone(tmpBlock.spans[i]))
}
+ bc.spanIDs = append(bc.spanIDs, tmpBlock.spanIDs...)
for _, t := range tmpBlock.tags {
if len(t.values) == 0 {
@@ -485,6 +569,8 @@ func (bi *blockPointer) append(b *blockPointer, offset int)
{
assertIdxAndOffset("spans", len(b.spans), bi.idx, offset)
bi.spans = append(bi.spans, b.spans[b.idx:offset]...)
+ assertIdxAndOffset("spanIDs", len(b.spanIDs), bi.idx, offset)
+ bi.spanIDs = append(bi.spanIDs, b.spanIDs[b.idx:offset]...)
}
func fastTagAppend(bi, b *blockPointer, offset int) error {
diff --git a/banyand/trace/block_reader_test.go
b/banyand/trace/block_reader_test.go
index db95a5e2..5649c2ef 100644
--- a/banyand/trace/block_reader_test.go
+++ b/banyand/trace/block_reader_test.go
@@ -169,6 +169,7 @@ func Test_blockReader_TagTypePerPart(t *testing.T) {
timestamps: []int64{ts},
tags: [][]*tagValue{{tv}},
spans: [][]byte{[]byte("span")},
+ spanIDs: []string{"span"},
}
return tr
}
diff --git a/banyand/trace/block_test.go b/banyand/trace/block_test.go
index 7984a44e..34eec437 100644
--- a/banyand/trace/block_test.go
+++ b/banyand/trace/block_test.go
@@ -84,7 +84,8 @@ func toTagProjection(b block) []string {
}
var conventionalBlockWithTS = block{
- spans: [][]byte{[]byte("span1"), []byte("span2")},
+ spans: [][]byte{[]byte("span1"), []byte("span2")},
+ spanIDs: []string{"span1", "span2"},
tags: []tag{
{
name: "binaryTag", valueType: pbv1.ValueTypeBinaryData,
@@ -116,7 +117,8 @@ var conventionalBlockWithTS = block{
}
var conventionalBlock = block{
- spans: [][]byte{[]byte("span1"), []byte("span2")},
+ spans: [][]byte{[]byte("span1"), []byte("span2")},
+ spanIDs: []string{"span1", "span2"},
tags: []tag{
{
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
@@ -149,6 +151,7 @@ func Test_block_mustInitFromTrace(t *testing.T) {
type args struct {
timestamps []int64
spans [][]byte
+ spanIDs []string
tags [][]*tagValue
}
tests := []struct {
@@ -161,6 +164,7 @@ func Test_block_mustInitFromTrace(t *testing.T) {
args: args{
timestamps: []int64{1, 2},
spans: [][]byte{[]byte("span1"),
[]byte("span2")},
+ spanIDs: []string{"span1", "span2"},
tags: [][]*tagValue{
{
{tag: "binaryTag", valueType:
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
@@ -186,7 +190,7 @@ func Test_block_mustInitFromTrace(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &block{}
- b.mustInitFromTrace(tt.args.spans, tt.args.tags,
tt.args.timestamps)
+ b.mustInitFromTrace(tt.args.spans, tt.args.tags,
tt.args.timestamps, tt.args.spanIDs)
if !reflect.DeepEqual(*b, tt.want) {
t.Errorf("block.mustInitFromTrace() = %+v, want
%+v", *b, tt.want)
}
@@ -218,11 +222,13 @@ func Test_mustWriteAndReadSpans(t *testing.T) {
tests := []struct {
name string
spans [][]byte
+ spanIDs []string
wantPanic bool
}{
{
- name: "Test mustWriteAndReadSpans",
- spans: [][]byte{[]byte("span1"), []byte("span2"),
[]byte("span3")},
+ name: "Test mustWriteAndReadSpans",
+ spans: [][]byte{[]byte("span1"), []byte("span2"),
[]byte("span3")},
+ spanIDs: []string{"id1", "id2", "id3"},
},
}
decoder := &encoding.BytesBlockDecoder{}
@@ -238,11 +244,14 @@ func Test_mustWriteAndReadSpans(t *testing.T) {
b := &bytes.Buffer{}
w := new(writer)
w.init(b)
- mustWriteSpansTo(sm, tt.spans, w)
- spans := mustReadSpansFrom(decoder, nil, sm,
len(tt.spans), b)
+ mustWriteSpansTo(sm, tt.spans, tt.spanIDs, w)
+ spans, spanIDs := mustReadSpansFrom(decoder, nil, nil,
sm, len(tt.spans), b)
if !reflect.DeepEqual(spans, tt.spans) {
t.Errorf("mustReadSpansFrom() spans = %v, want
%v", spans, tt.spans)
}
+ if !reflect.DeepEqual(spanIDs, tt.spanIDs) {
+ t.Errorf("mustReadSpansFrom() spanIDs = %v,
want %v", spanIDs, tt.spanIDs)
+ }
})
}
}
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 732685e0..264cade8 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -194,14 +194,14 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem
fs.FileSystem, path string
bw.writers.spanWriter.init(fs.MustCreateFile(fileSystem,
filepath.Join(path, spansFilename), storage.FilePerm, shouldCache))
}
-func (bw *blockWriter) MustWriteTrace(tid string, spans [][]byte, tags
[][]*tagValue, timestamps []int64) {
+func (bw *blockWriter) MustWriteTrace(tid string, spans [][]byte, tags
[][]*tagValue, timestamps []int64, spanIDs []string) {
if len(spans) == 0 {
return
}
b := generateBlock()
defer releaseBlock(b)
- b.mustInitFromTrace(spans, tags, timestamps)
+ b.mustInitFromTrace(spans, tags, timestamps, spanIDs)
bw.mustWriteBlock(tid, b)
}
diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go
index a76b4486..33f1209d 100644
--- a/banyand/trace/merger_test.go
+++ b/banyand/trace/merger_test.go
@@ -61,7 +61,8 @@ func Test_mergeTwoBlocks(t *testing.T) {
name: "Merge two non-empty blocks without overlap",
left: &blockPointer{
block: block{
- spans: [][]byte{[]byte("span1"),
[]byte("span2")},
+ spans: [][]byte{[]byte("span1"),
[]byte("span2")},
+ spanIDs: []string{"span1", "span2"},
tags: []tag{
{
name: "strArrTag",
valueType: pbv1.ValueTypeStrArr,
@@ -72,7 +73,8 @@ func Test_mergeTwoBlocks(t *testing.T) {
},
right: &blockPointer{
block: block{
- spans: [][]byte{[]byte("span3"),
[]byte("span4")},
+ spans: [][]byte{[]byte("span3"),
[]byte("span4")},
+ spanIDs: []string{"span3", "span4"},
tags: []tag{
{
name: "strArrTag",
valueType: pbv1.ValueTypeStrArr,
@@ -97,7 +99,8 @@ func Test_mergeTwoBlocks(t *testing.T) {
}
var mergedBlock = block{
- spans: [][]byte{[]byte("span1"), []byte("span2"), []byte("span3"),
[]byte("span4")},
+ spans: [][]byte{[]byte("span1"), []byte("span2"), []byte("span3"),
[]byte("span4")},
+ spanIDs: []string{"span1", "span2", "span3", "span4"},
tags: []tag{
{
name: "strArrTag", valueType: pbv1.ValueTypeStrArr,
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 973f5d95..db741a49 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -456,14 +456,14 @@ func (mp *memPart) mustInitFromTraces(ts *traces) {
}
if uncompressedSpansSizeBytes >= maxUncompressedSpanSize || tid
!= tidPrev {
- bsw.MustWriteTrace(tidPrev, ts.spans[indexPrev:i],
ts.tags[indexPrev:i], ts.timestamps[indexPrev:i])
+ bsw.MustWriteTrace(tidPrev, ts.spans[indexPrev:i],
ts.tags[indexPrev:i], ts.timestamps[indexPrev:i], ts.spanIDs[indexPrev:i])
tidPrev = tid
indexPrev = i
uncompressedSpansSizeBytes = 0
}
uncompressedSpansSizeBytes += uint64(len(ts.spans[i]))
}
- bsw.MustWriteTrace(tidPrev, ts.spans[indexPrev:], ts.tags[indexPrev:],
ts.timestamps[indexPrev:])
+ bsw.MustWriteTrace(tidPrev, ts.spans[indexPrev:], ts.tags[indexPrev:],
ts.timestamps[indexPrev:], ts.spanIDs[indexPrev:])
bsw.Flush(&mp.partMetadata, &mp.traceIDFilter, &mp.tagType)
releaseBlockWriter(bsw)
}
diff --git a/banyand/trace/part_test.go b/banyand/trace/part_test.go
index e741f9e9..ecc84ca1 100644
--- a/banyand/trace/part_test.go
+++ b/banyand/trace/part_test.go
@@ -43,6 +43,7 @@ func TestMustInitFromTraces(t *testing.T) {
timestamps: []int64{},
tags: [][]*tagValue{},
spans: [][]byte{},
+ spanIDs: []string{},
},
want: partMetadata{},
},
@@ -57,7 +58,8 @@ func TestMustInitFromTraces(t *testing.T) {
{tag: "intArrTag", valueType:
pbv1.ValueTypeInt64Arr, value: nil, valueArr:
[][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}},
},
},
- spans: [][]byte{[]byte("span1")},
+ spans: [][]byte{[]byte("span1")},
+ spanIDs: []string{"span1"},
},
want: partMetadata{
BlocksCount: 1,
@@ -156,6 +158,7 @@ var ts = &traces{
[]byte("span5"),
[]byte("span6"),
},
+ spanIDs: []string{"span1", "span2", "span3", "span4", "span5", "span6"},
}
func TestMustInitFromPart(t *testing.T) {
@@ -238,6 +241,7 @@ func Test_memPart_Marshal_Unmarshal(t *testing.T) {
[]byte("span2"),
[]byte("span3"),
},
+ spanIDs: []string{"span1", "span2", "span3"},
}
mp.mustInitFromTraces(tsData)
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
index 0d34ff2f..feb3119e 100644
--- a/banyand/trace/query_test.go
+++ b/banyand/trace/query_test.go
@@ -63,7 +63,8 @@ func TestQueryResult(t *testing.T) {
{Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
{Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
},
- Spans: [][]byte{[]byte("span1")},
+ Spans: [][]byte{[]byte("span1")},
+ SpanIDs: []string{"span1"},
}},
},
{
@@ -80,7 +81,8 @@ func TestQueryResult(t *testing.T) {
{Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1"), strTagValue("value4")}},
{Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10), int64TagValue(40)}},
},
- Spans: [][]byte{[]byte("span1"),
[]byte("span4")},
+ Spans: [][]byte{[]byte("span1"),
[]byte("span4")},
+ SpanIDs: []string{"span1", "span4"},
}},
},
}
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index b8129008..1fc5b5ab 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -104,6 +104,7 @@ type traces struct {
timestamps []int64
tags [][]*tagValue
spans [][]byte
+ spanIDs []string
}
func (t *traces) reset() {
@@ -116,6 +117,7 @@ func (t *traces) reset() {
}
t.tags = t.tags[:0]
t.spans = t.spans[:0]
+ t.spanIDs = t.spanIDs[:0]
}
func (t *traces) Len() int {
@@ -131,6 +133,7 @@ func (t *traces) Swap(i, j int) {
t.timestamps[i], t.timestamps[j] = t.timestamps[j], t.timestamps[i]
t.tags[i], t.tags[j] = t.tags[j], t.tags[i]
t.spans[i], t.spans[j] = t.spans[j], t.spans[i]
+ t.spanIDs[i], t.spanIDs[j] = t.spanIDs[j], t.spanIDs[i]
}
func generateTraces() *traces {
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 6b7ef6bb..6652aa9c 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -50,6 +50,7 @@ func Test_tsTable_mustAddTraces(t *testing.T) {
timestamps: []int64{},
tags: [][]*tagValue{},
spans: [][]byte{},
+ spanIDs: []string{},
},
},
want: 0,
@@ -66,7 +67,8 @@ func Test_tsTable_mustAddTraces(t *testing.T) {
{tag: "intArrTag",
valueType: pbv1.ValueTypeInt64Arr, value: nil, valueArr:
[][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}},
},
},
- spans: [][]byte{[]byte("span1")},
+ spans: [][]byte{[]byte("span1")},
+ spanIDs: []string{"span1"},
},
},
want: 1,
@@ -241,7 +243,8 @@ var tsTS1 = &traces{
{tag: "intTag", valueType: pbv1.ValueTypeInt64, value:
convert.Int64ToBytes(30), valueArr: nil},
},
},
- spans: [][]byte{[]byte("span1"), []byte("span2"), []byte("span3")},
+ spans: [][]byte{[]byte("span1"), []byte("span2"), []byte("span3")},
+ spanIDs: []string{"span1", "span2", "span3"},
}
var tsTS2 = &traces{
@@ -264,7 +267,8 @@ var tsTS2 = &traces{
{tag: "intTag", valueType: pbv1.ValueTypeInt64, value:
convert.Int64ToBytes(60), valueArr: nil},
},
},
- spans: [][]byte{[]byte("span4"), []byte("span5"), []byte("span6")},
+ spans: [][]byte{[]byte("span4"), []byte("span5"), []byte("span6")},
+ spanIDs: []string{"span4", "span5", "span6"},
}
func generateHugeTraces(num int) *traces {
@@ -273,6 +277,7 @@ func generateHugeTraces(num int) *traces {
timestamps: []int64{},
tags: [][]*tagValue{},
spans: [][]byte{},
+ spanIDs: []string{},
}
for i := 1; i <= num; i++ {
traces.traceIDs = append(traces.traceIDs, "trace1")
@@ -285,6 +290,7 @@ func generateHugeTraces(num int) *traces {
{tag: "intTag", valueType: pbv1.ValueTypeInt64, value:
convert.Int64ToBytes(30), valueArr: nil},
})
traces.spans = append(traces.spans, []byte("span1"))
+ traces.spanIDs = append(traces.spanIDs, "span1")
}
traces.traceIDs = append(traces.traceIDs, []string{"trace2",
"trace3"}...)
traces.timestamps = append(traces.timestamps, []int64{int64(num + 1),
int64(num + 2)}...)
@@ -296,5 +302,6 @@ func generateHugeTraces(num int) *traces {
{}, // empty tags
}...)
traces.spans = append(traces.spans, [][]byte{[]byte("span2"),
[]byte("span3")}...)
+ traces.spanIDs = append(traces.spanIDs, []string{"span2", "span3"}...)
return traces
}
diff --git a/banyand/trace/write_standalone.go
b/banyand/trace/write_standalone.go
index 89db05fa..18e09722 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -170,21 +170,26 @@ func (w *writeCallback) prepareTracesInTable(eg
*tracesInGroup, writeEvent *trac
return et, nil
}
-func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable,
writeEvent *tracev1.InternalWriteRequest) error {
- req := writeEvent.Request
- stm, ok := schemaRepo.loadTrace(req.GetMetadata())
- if !ok {
- return fmt.Errorf("cannot find trace definition: %s",
req.GetMetadata())
- }
-
+func extractTraceSpanInfo(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest) (string, error) {
idx, err := getTagIndex(stm, stm.schema.TraceIdTagName)
if err != nil {
- return err
+ return "", err
}
traceID := req.Tags[idx].GetStr().GetValue()
tracesInTable.traces.traceIDs = append(tracesInTable.traces.traceIDs,
traceID)
+
+ idx, err = getTagIndex(stm, stm.schema.SpanIdTagName)
+ if err != nil {
+ return "", err
+ }
+ spanID := req.Tags[idx].GetStr().GetValue()
+ tracesInTable.traces.spanIDs = append(tracesInTable.traces.spanIDs,
spanID)
tracesInTable.traces.spans = append(tracesInTable.traces.spans,
req.Span)
+ return traceID, nil
+}
+
+func validateTags(stm *trace, req *tracev1.WriteRequest) error {
tLen := len(req.GetTags())
if tLen < 1 {
return fmt.Errorf("%s has no tag family", req)
@@ -199,12 +204,17 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable
*tracesInTable, writeEv
len(is.indexRuleLocators),
len(stm.GetSchema().GetTags()))
}
+ return nil
+}
+
+func buildTagsAndMap(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest) ([]*tagValue, map[string]*tagValue) {
tags := make([]*tagValue, 0, len(stm.schema.Tags))
tagMap := make(map[string]*tagValue, len(stm.schema.Tags))
tagSpecs := stm.GetSchema().GetTags()
+
for i := range tagSpecs {
tagSpec := tagSpecs[i]
- if tagSpec.Name == stm.schema.TraceIdTagName {
+ if tagSpec.Name == stm.schema.TraceIdTagName || tagSpec.Name ==
stm.schema.SpanIdTagName {
continue
}
if tagSpec.Name == stm.schema.TimestampTagName {
@@ -223,6 +233,10 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable
*tracesInTable, writeEv
}
tracesInTable.traces.tags = append(tracesInTable.traces.tags, tags)
+ return tags, tagMap
+}
+
+func buildSidxTags(tags []*tagValue) []sidx.Tag {
sidxTags := make([]sidx.Tag, 0, len(tags))
for _, tag := range tags {
if tag.valueArr != nil {
@@ -239,7 +253,10 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable
*tracesInTable, writeEv
})
}
}
+ return sidxTags
+}
+func processIndexRules(stm *trace, tracesInTable *tracesInTable, req
*tracev1.WriteRequest, traceID string, tagMap map[string]*tagValue, sidxTags
[]sidx.Tag) error {
indexRules := stm.GetIndexRules()
for _, indexRule := range indexRules {
tagName := indexRule.Tags[len(indexRule.Tags)-1]
@@ -331,6 +348,28 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable
*tracesInTable, writeEv
return nil
}
+func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable,
writeEvent *tracev1.InternalWriteRequest) error {
+ req := writeEvent.Request
+ stm, ok := schemaRepo.loadTrace(req.GetMetadata())
+ if !ok {
+ return fmt.Errorf("cannot find trace definition: %s",
req.GetMetadata())
+ }
+
+ traceID, err := extractTraceSpanInfo(stm, tracesInTable, req)
+ if err != nil {
+ return err
+ }
+
+ if err := validateTags(stm, req); err != nil {
+ return err
+ }
+
+ tags, tagMap := buildTagsAndMap(stm, tracesInTable, req)
+ sidxTags := buildSidxTags(tags)
+
+ return processIndexRules(stm, tracesInTable, req, traceID, tagMap,
sidxTags)
+}
+
func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp
bus.Message) {
events, ok := message.Data().([]any)
if !ok {
diff --git a/bydbctl/internal/cmd/trace_test.go
b/bydbctl/internal/cmd/trace_test.go
index 18b33a2a..8d74cad6 100644
--- a/bydbctl/internal/cmd/trace_test.go
+++ b/bydbctl/internal/cmd/trace_test.go
@@ -80,9 +80,12 @@ metadata:
tags:
- name: trace_id
type: TAG_TYPE_STRING
+ - name: span_id
+ type: TAG_TYPE_STRING
- name: timestamp
type: TAG_TYPE_TIMESTAMP
trace_id_tag_name: trace_id
+span_id_tag_name: span_id
timestamp_tag_name: timestamp`))
return capturer.CaptureStdout(func() {
err := rootCmd.Execute()
@@ -106,8 +109,9 @@ timestamp_tag_name: timestamp`))
Expect(resp.Trace.Metadata.Group).To(Equal("group1"))
Expect(resp.Trace.Metadata.Name).To(Equal("name1"))
Expect(resp.Trace.TraceIdTagName).To(Equal("trace_id"))
+ Expect(resp.Trace.SpanIdTagName).To(Equal("span_id"))
Expect(resp.Trace.TimestampTagName).To(Equal("timestamp"))
- Expect(resp.Trace.Tags).To(HaveLen(2))
+ Expect(resp.Trace.Tags).To(HaveLen(3))
})
It("update trace schema", func() {
@@ -119,11 +123,14 @@ metadata:
tags:
- name: trace_id
type: TAG_TYPE_STRING
+ - name: span_id
+ type: TAG_TYPE_STRING
- name: timestamp
type: TAG_TYPE_TIMESTAMP
- name: service_name
type: TAG_TYPE_STRING
trace_id_tag_name: trace_id
+span_id_tag_name: span_id
timestamp_tag_name: timestamp`))
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
@@ -139,8 +146,8 @@ timestamp_tag_name: timestamp`))
helpers.UnmarshalYAML([]byte(out), resp)
Expect(resp.Trace.Metadata.Group).To(Equal("group1"))
Expect(resp.Trace.Metadata.Name).To(Equal("name1"))
- Expect(resp.Trace.Tags).To(HaveLen(3))
- Expect(resp.Trace.Tags[2].Name).To(Equal("service_name"))
+ Expect(resp.Trace.Tags).To(HaveLen(4))
+ Expect(resp.Trace.Tags[3].Name).To(Equal("service_name"))
})
It("delete trace schema", func() {
@@ -167,9 +174,12 @@ metadata:
tags:
- name: trace_id
type: TAG_TYPE_STRING
+ - name: span_id
+ type: TAG_TYPE_STRING
- name: timestamp
type: TAG_TYPE_TIMESTAMP
trace_id_tag_name: trace_id
+span_id_tag_name: span_id
timestamp_tag_name: timestamp`))
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 0ec62205..1feb8f86 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -1568,6 +1568,7 @@ InternalTrace is the trace that is used for internal use.
| spans | [Span](#banyandb-trace-v1-Span) | repeated | spans are the spans
that belong to this trace. |
| trace_id | [string](#string) | | trace_id is the unique identifier of the
trace. |
| key | [int64](#int64) | | key is used for sorting. |
+| span_ids | [string](#string) | repeated | span_ids are the ids of the spans
that belong to the trace. |
@@ -2483,6 +2484,7 @@ while the group of a Trace corresponds to a physical
directory.
| trace_id_tag_name | [string](#string) | | trace_id_tag_name is the name of
the tag that stores the trace ID. |
| timestamp_tag_name | [string](#string) | | timestamp_tag_name is the name
of the tag that stores the timestamp. |
| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | |
updated_at indicates when the trace resource is updated. |
+| span_id_tag_name | [string](#string) | | span_id_tag_name is the name of
the tag that stores the span ID. |
diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go
index 7650bf15..5130cdcf 100644
--- a/pkg/encoding/bytes.go
+++ b/pkg/encoding/bytes.go
@@ -114,6 +114,42 @@ func (bbd *BytesBlockDecoder) Decode(dst [][]byte, src
[]byte, itemsCount uint64
return dst, nil
}
+// DecodeWithTail decodes a block of strings from src and returns the
remaining tail.
+func (bbd *BytesBlockDecoder) DecodeWithTail(dst [][]byte, src []byte,
itemsCount uint64) ([][]byte, []byte, error) {
+ u64List := GenerateUint64List(0)
+ defer ReleaseUint64List(u64List)
+
+ var tail []byte
+ var err error
+ u64List.L, tail, err = DecodeUint64Block(u64List.L[:0], src, itemsCount)
+ if err != nil {
+ return dst, nil, fmt.Errorf("cannot decode string lengths: %w",
err)
+ }
+ aLens := u64List.L
+ src = tail
+
+ dataLen := len(bbd.data)
+ bbd.data, tail, err = decompressBlock(bbd.data, src)
+ if err != nil {
+ return dst, tail, fmt.Errorf("cannot decode bytes block with
strings: %w", err)
+ }
+
+ data := bbd.data[dataLen:]
+ for _, sLen := range aLens {
+ if uint64(len(data)) < sLen {
+ return dst, tail, fmt.Errorf("cannot decode a string
with the length %d bytes from %d bytes", sLen, len(data))
+ }
+ if sLen == 0 {
+ dst = append(dst, nil)
+ continue
+ }
+ dst = append(dst, data[:sLen])
+ data = data[sLen:]
+ }
+
+ return dst, tail, nil
+}
+
// EncodeUint64Block encodes a block of uint64 values into dst.
func EncodeUint64Block(dst []byte, a []uint64) []byte {
bb := bbPool.Generate()
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index 9480548c..1e4d0af7 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -82,7 +82,7 @@ func BuildSimpleTagFilter(criteria *modelv1.Criteria)
(TagFilter, error) {
// BuildTagFilter returns a TagFilter.
func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int,
schema Schema,
- indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string,
+ indexChecker IndexChecker, hasGlobalIndex bool, skippedTagNames
...string,
) (TagFilter, error) {
if criteria == nil {
return DummyFilter, nil
@@ -100,17 +100,19 @@ func BuildTagFilter(criteria *modelv1.Criteria,
entityDict map[string]int, schem
if _, ok := entityDict[cond.Name]; ok && !hasGlobalIndex {
return DummyFilter, nil
}
- if cond.Name == globalTagName {
- return DummyFilter, nil
+ for _, skippedTagName := range skippedTagNames {
+ if cond.Name == skippedTagName {
+ return DummyFilter, nil
+ }
}
return parseFilter(cond, expr, schema, indexChecker)
case *modelv1.Criteria_Le:
le := criteria.GetLe()
- left, err := BuildTagFilter(le.Left, entityDict, schema,
indexChecker, hasGlobalIndex, globalTagName)
+ left, err := BuildTagFilter(le.Left, entityDict, schema,
indexChecker, hasGlobalIndex, skippedTagNames...)
if err != nil {
return nil, err
}
- right, err := BuildTagFilter(le.Right, entityDict, schema,
indexChecker, hasGlobalIndex, globalTagName)
+ right, err := BuildTagFilter(le.Right, entityDict, schema,
indexChecker, hasGlobalIndex, skippedTagNames...)
if err != nil {
return nil, err
}
diff --git a/pkg/query/logical/trace/index_filter.go
b/pkg/query/logical/trace/index_filter.go
index d99a0253..c3658080 100644
--- a/pkg/query/logical/trace/index_filter.go
+++ b/pkg/query/logical/trace/index_filter.go
@@ -36,7 +36,7 @@ import (
// Trace creates explicit index rules for skipping index on all tags that
don't belong to entity.
// Returns min/max int64 values for the orderByTag if provided, otherwise
returns math.MaxInt64, math.MinInt64.
func buildFilter(criteria *modelv1.Criteria, schema logical.Schema, tagNames
map[string]bool,
- entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName
string, orderByTag string,
+ entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName,
spanIDTagName string, orderByTag string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
if criteria == nil {
return nil, [][]*modelv1.TagValue{entity}, nil, nil,
math.MinInt64, math.MaxInt64, nil
@@ -44,9 +44,9 @@ func buildFilter(criteria *modelv1.Criteria, schema
logical.Schema, tagNames map
switch criteria.GetExp().(type) {
case *modelv1.Criteria_Condition:
- return buildFilterFromCondition(criteria.GetCondition(),
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
+ return buildFilterFromCondition(criteria.GetCondition(),
schema, tagNames, entityDict, entity, traceIDTagName, spanIDTagName, orderByTag)
case *modelv1.Criteria_Le:
- return buildFilterFromLogicalExpression(criteria.GetLe(),
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
+ return buildFilterFromLogicalExpression(criteria.GetLe(),
schema, tagNames, entityDict, entity, traceIDTagName, spanIDTagName, orderByTag)
}
return nil, nil, nil, nil, math.MinInt64, math.MaxInt64,
logical.ErrInvalidCriteriaType
@@ -278,9 +278,9 @@ func (thf *traceHavingFilter) String() string {
return thf.op + ":" + thf.tagName
}
-// extractTraceIDsFromCondition extracts trace IDs from equal and in
conditions.
-func extractTraceIDsFromCondition(cond *modelv1.Condition) []string {
- var traceIDs []string
+// extractIDsFromCondition extracts IDs from equal and in conditions.
+func extractIDsFromCondition(cond *modelv1.Condition) []string {
+ var ids []string
switch cond.Op {
case modelv1.Condition_BINARY_OP_EQ:
@@ -288,11 +288,11 @@ func extractTraceIDsFromCondition(cond
*modelv1.Condition) []string {
switch val := cond.Value.Value.(type) {
case *modelv1.TagValue_Str:
if val.Str != nil {
- traceIDs = append(traceIDs,
val.Str.Value)
+ ids = append(ids, val.Str.Value)
}
case *modelv1.TagValue_StrArray:
if val.StrArray != nil {
- traceIDs = append(traceIDs,
val.StrArray.Value...)
+ ids = append(ids, val.StrArray.Value...)
}
}
}
@@ -301,26 +301,26 @@ func extractTraceIDsFromCondition(cond
*modelv1.Condition) []string {
switch val := cond.Value.Value.(type) {
case *modelv1.TagValue_StrArray:
if val.StrArray != nil {
- traceIDs = append(traceIDs,
val.StrArray.Value...)
+ ids = append(ids, val.StrArray.Value...)
}
case *modelv1.TagValue_Str:
if val.Str != nil {
- traceIDs = append(traceIDs,
val.Str.Value)
+ ids = append(ids, val.Str.Value)
}
}
}
case modelv1.Condition_BINARY_OP_NE, modelv1.Condition_BINARY_OP_LT,
modelv1.Condition_BINARY_OP_GT,
modelv1.Condition_BINARY_OP_LE, modelv1.Condition_BINARY_OP_GE,
modelv1.Condition_BINARY_OP_HAVING,
modelv1.Condition_BINARY_OP_NOT_HAVING,
modelv1.Condition_BINARY_OP_NOT_IN, modelv1.Condition_BINARY_OP_MATCH:
- // These operations don't support trace ID extraction
+ // These operations don't support ID extraction
}
- return traceIDs
+ return ids
}
// buildFilterFromCondition handles single condition filtering and min/max
extraction.
func buildFilterFromCondition(cond *modelv1.Condition, schema logical.Schema,
tagNames map[string]bool, entityDict map[string]int,
- entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
+ entity []*modelv1.TagValue, traceIDTagName, spanIDTagName, orderByTag
string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
var collectedTagNames []string
var traceIDs []string
@@ -343,8 +343,8 @@ func buildFilterFromCondition(cond *modelv1.Condition,
schema logical.Schema, ta
}
if cond.Name == traceIDTagName && (cond.Op ==
modelv1.Condition_BINARY_OP_EQ || cond.Op == modelv1.Condition_BINARY_OP_IN) {
- traceIDs = extractTraceIDsFromCondition(cond)
- } else {
+ traceIDs = extractIDsFromCondition(cond)
+ } else if cond.Name != spanIDTagName {
collectedTagNames = append(collectedTagNames, cond.Name)
}
@@ -366,7 +366,7 @@ func buildFilterFromCondition(cond *modelv1.Condition,
schema logical.Schema, ta
// buildFilterFromLogicalExpression handles logical expression (AND/OR)
filtering and min/max extraction.
func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, schema
logical.Schema, tagNames map[string]bool, entityDict map[string]int,
- entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
+ entity []*modelv1.TagValue, traceIDTagName, spanIDTagName, orderByTag
string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
var collectedTagNames []string
var traceIDs []string
@@ -377,17 +377,19 @@ func buildFilterFromLogicalExpression(le
*modelv1.LogicalExpression, schema logi
return nil, nil, nil, traceIDs, minVal, maxVal,
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and
right) of [%v] are empty", le)
}
if le.GetLeft() == nil {
- return buildFilter(le.Right, schema, tagNames, entityDict,
entity, traceIDTagName, orderByTag)
+ return buildFilter(le.Right, schema, tagNames, entityDict,
entity, traceIDTagName, spanIDTagName, orderByTag)
}
if le.GetRight() == nil {
- return buildFilter(le.Left, schema, tagNames, entityDict,
entity, traceIDTagName, orderByTag)
+ return buildFilter(le.Left, schema, tagNames, entityDict,
entity, traceIDTagName, spanIDTagName, orderByTag)
}
- left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err
:= buildFilter(le.Left, schema, tagNames, entityDict, entity, traceIDTagName,
orderByTag)
+ left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err
:= buildFilter(le.Left, schema, tagNames, entityDict, entity,
+ traceIDTagName, spanIDTagName, orderByTag)
if err != nil {
return nil, nil, leftTagNames, leftTraceIDs, minVal, maxVal, err
}
- right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax,
err := buildFilter(le.Right, schema, tagNames, entityDict, entity,
traceIDTagName, orderByTag)
+ right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax,
err := buildFilter(le.Right, schema, tagNames, entityDict, entity,
+ traceIDTagName, spanIDTagName, orderByTag)
if err != nil {
return nil, nil, append(leftTagNames, rightTagNames...),
append(leftTraceIDs, rightTraceIDs...), minVal, maxVal, err
}
diff --git a/pkg/query/logical/trace/trace_analyzer.go
b/pkg/query/logical/trace/trace_analyzer.go
index f9d56725..3d22951c 100644
--- a/pkg/query/logical/trace/trace_analyzer.go
+++ b/pkg/query/logical/trace/trace_analyzer.go
@@ -33,7 +33,7 @@ const defaultLimit uint32 = 20
// Analyze converts logical expressions to executable operation tree
represented by Plan.
func Analyze(criteria *tracev1.QueryRequest, metadata []*commonv1.Metadata, ss
[]logical.Schema,
- ecc []executor.TraceExecutionContext, traceIDTagNames []string,
+ ecc []executor.TraceExecutionContext, traceIDTagNames, spanIDTagNames
[]string,
) (logical.Plan, error) {
if len(metadata) != len(ss) {
return nil, fmt.Errorf("number of schemas %d not equal to
number of metadata %d", len(ss), len(metadata))
@@ -54,7 +54,7 @@ func Analyze(criteria *tracev1.QueryRequest, metadata
[]*commonv1.Metadata, ss [
}
orderByTag = indexRule.Tags[len(indexRule.Tags)-1]
}
- plan = parseTraceTags(criteria, metadata[0], ecc[0],
tagProjection, traceIDTagNames[0], orderByTag, 0)
+ plan = parseTraceTags(criteria, metadata[0], ecc[0],
tagProjection, traceIDTagNames[0], spanIDTagNames[0], orderByTag, 0)
s = ss[0]
} else {
var err error
@@ -67,6 +67,7 @@ func Analyze(criteria *tracev1.QueryRequest, metadata
[]*commonv1.Metadata, ss [
ecc: ecc,
tagProjection: tagProjection,
traceIDTagNames: traceIDTagNames,
+ spanIDTagNames: spanIDTagNames,
}
}
@@ -223,7 +224,7 @@ func newTraceLimit(input logical.UnresolvedPlan, offset,
num uint32) logical.Unr
}
func parseTraceTags(criteria *tracev1.QueryRequest, metadata
*commonv1.Metadata,
- ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag,
traceIDTagName, orderByTag string, groupIndex int,
+ ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag,
traceIDTagName, spanIDTagName, orderByTag string, groupIndex int,
) logical.UnresolvedPlan {
timeRange := criteria.GetTimeRange()
return &unresolvedTraceTagFilter{
@@ -234,6 +235,7 @@ func parseTraceTags(criteria *tracev1.QueryRequest,
metadata *commonv1.Metadata,
projectionTags: tagProjection,
ec: ec,
traceIDTagName: traceIDTagName,
+ spanIDTagName: spanIDTagName,
orderByTag: orderByTag,
groupIndex: groupIndex,
}
diff --git a/pkg/query/logical/trace/trace_plan_distributed.go
b/pkg/query/logical/trace/trace_plan_distributed.go
index 1238967e..f21ba423 100644
--- a/pkg/query/logical/trace/trace_plan_distributed.go
+++ b/pkg/query/logical/trace/trace_plan_distributed.go
@@ -20,6 +20,7 @@ package trace
import (
"context"
"fmt"
+ "slices"
"time"
"go.uber.org/multierr"
@@ -136,13 +137,17 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
}
tracer := query.GetTracer(ctx)
var span *query.Span
+ var err error
if tracer != nil {
span, _ = tracer.StartSpan(ctx, "distributed-client")
queryRequest.Trace = true
span.Tag("request",
convert.BytesToString(logger.Proto(queryRequest)))
defer func() {
- // TODO: handle error
- span.Stop()
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.Stop()
+ }
}()
}
ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicTraceQuery,
@@ -170,12 +175,19 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
}
sortIter := sort.NewItemIter(st, p.desc)
var result []*tracev1.InternalTrace
- seen := make(map[string]bool)
+ seen := make(map[string]*tracev1.InternalTrace)
for sortIter.Next() {
trace := sortIter.Val().InternalTrace
- if !seen[trace.TraceId] {
- seen[trace.TraceId] = true
+ if _, ok := seen[trace.TraceId]; !ok {
+ seen[trace.TraceId] = trace
result = append(result, trace)
+ } else {
+ for _, spanID := range trace.SpanIds {
+ if
!slices.Contains(seen[trace.TraceId].SpanIds, spanID) {
+ seen[trace.TraceId].SpanIds =
append(seen[trace.TraceId].SpanIds, spanID)
+ seen[trace.TraceId].Spans =
append(seen[trace.TraceId].Spans, trace.Spans...)
+ }
+ }
}
}
@@ -212,7 +224,6 @@ type comparableTrace struct {
func newComparableTrace(t *tracev1.InternalTrace, sortByTraceID bool)
(*comparableTrace, error) {
var sortField []byte
if sortByTraceID {
- // For traces, we use trace ID as sort field when sorting by
time
sortField = []byte(t.TraceId)
} else {
sortField = convert.Int64ToBytes(t.Key)
diff --git a/pkg/query/logical/trace/trace_plan_merge.go
b/pkg/query/logical/trace/trace_plan_merge.go
index e32f89fb..8d655094 100644
--- a/pkg/query/logical/trace/trace_plan_merge.go
+++ b/pkg/query/logical/trace/trace_plan_merge.go
@@ -42,6 +42,7 @@ type unresolvedTraceMerger struct {
ecc []executor.TraceExecutionContext
tagProjection [][]*logical.Tag
traceIDTagNames []string
+ spanIDTagNames []string
}
// Analyze implements logical.UnresolvedPlan.
@@ -74,7 +75,7 @@ func (u *unresolvedTraceMerger) Analyze(s logical.Schema)
(logical.Plan, error)
orderByTag = tags[len(tags)-1]
}
}
- subPlan := parseTraceTags(u.criteria, u.metadata[i], u.ecc[i],
u.tagProjection, u.traceIDTagNames[i], orderByTag, i)
+ subPlan := parseTraceTags(u.criteria, u.metadata[i], u.ecc[i],
u.tagProjection, u.traceIDTagNames[i], u.spanIDTagNames[i], orderByTag, i)
sp, err := subPlan.Analyze(ss[i])
if err != nil {
return nil, err
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index c4a207ce..619684dc 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -44,6 +44,7 @@ type unresolvedTraceTagFilter struct {
metadata *commonv1.Metadata
criteria *modelv1.Criteria
traceIDTagName string
+ spanIDTagName string
orderByTag string
projectionTags [][]*logical.Tag
groupIndex int
@@ -66,7 +67,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s
logical.Schema) (logical.Plan, er
var minVal, maxVal int64
// For trace, we use skipping filter and capture entities for query
optimization
ctx.skippingFilter, entities, conditionTagNames, traceIDs, minVal,
maxVal, err = buildTraceFilter(
- uis.criteria, s, entityDict, entity, uis.traceIDTagName,
uis.orderByTag)
+ uis.criteria, s, entityDict, entity, uis.traceIDTagName,
uis.spanIDTagName, uis.orderByTag)
if err != nil {
return nil, err
}
@@ -86,7 +87,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s
logical.Schema) (logical.Plan, er
if len(uis.projectionTags) > 0 {
for i := range uis.projectionTags {
for _, tag := range uis.projectionTags[i] {
- if tag.GetTagName() == uis.traceIDTagName {
+ if tag.GetTagName() == uis.traceIDTagName ||
tag.GetTagName() == uis.spanIDTagName {
continue
}
ctx.projectionTags.Names =
append(ctx.projectionTags.Names, tag.GetTagName())
@@ -116,13 +117,14 @@ func (uis *unresolvedTraceTagFilter) Analyze(s
logical.Schema) (logical.Plan, er
}
plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal)
if uis.criteria != nil {
- tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName)
+ spanIDFilter := buildSpanIDFilter(uis.criteria,
uis.spanIDTagName)
+ tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName, uis.spanIDTagName)
if errFilter != nil {
return nil, errFilter
}
- if tagFilter != logical.DummyFilter {
- // create tagFilter with a projected view
- plan =
newTraceTagFilter(s.ProjTags(ctx.projTagsRefs...), plan, tagFilter)
+ if tagFilter != logical.DummyFilter || spanIDFilter != nil {
+ // create filter with a projected view
+ plan =
newTraceTagFilter(s.ProjTags(ctx.projTagsRefs...), plan, tagFilter,
spanIDFilter)
}
}
return plan, err
@@ -179,7 +181,7 @@ func deduplicateStrings(strings []string) []string {
// Unlike stream, trace only needs skipping filter.
// Returns min/max int64 values for the orderByTag if provided, otherwise
returns math.MaxInt64, math.MinInt64.
func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict
map[string]int,
- entity []*modelv1.TagValue, traceIDTagName string, orderByTag string,
+ entity []*modelv1.TagValue, traceIDTagName, spanIDTagName string,
orderByTag string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
if criteria == nil {
return nil, [][]*modelv1.TagValue{entity}, nil, nil,
math.MinInt64, math.MaxInt64, nil
@@ -191,7 +193,7 @@ func buildTraceFilter(criteria *modelv1.Criteria, s
logical.Schema, entityDict m
tagNames[tagName] = true
}
- filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err :=
buildFilter(criteria, s, tagNames, entityDict, entity, traceIDTagName,
orderByTag)
+ filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err :=
buildFilter(criteria, s, tagNames, entityDict, entity, traceIDTagName,
spanIDTagName, orderByTag)
return filter, entities, collectedTagNames, traceIDs, minVal, maxVal,
err
}
@@ -201,20 +203,22 @@ var (
)
type traceTagFilterPlan struct {
- s logical.Schema
- parent logical.Plan
- tagFilter logical.TagFilter
+ s logical.Schema
+ parent logical.Plan
+ tagFilter logical.TagFilter
+ spanIDFilter *spanIDFilter
}
func (t *traceTagFilterPlan) Close() {
t.parent.(executor.TraceExecutable).Close()
}
-func newTraceTagFilter(s logical.Schema, parent logical.Plan, tagFilter
logical.TagFilter) logical.Plan {
+func newTraceTagFilter(s logical.Schema, parent logical.Plan, tagFilter
logical.TagFilter, spanIDFilter *spanIDFilter) logical.Plan {
return &traceTagFilterPlan{
- s: s,
- parent: parent,
- tagFilter: tagFilter,
+ s: s,
+ parent: parent,
+ tagFilter: tagFilter,
+ spanIDFilter: spanIDFilter,
}
}
@@ -229,15 +233,17 @@ func (t *traceTagFilterPlan) Execute(ctx context.Context)
(iter.Iterator[model.T
sourceIterator: resultIterator,
tagFilter: t.tagFilter,
schema: t.s,
+ spanIDFilter: t.spanIDFilter,
}, nil
}
// traceTagFilterIterator implements iter.Iterator[model.TraceResult] by lazily
-// filtering results from the source iterator using the tag filter.
+// filtering results from the source iterator using the tag filter and spanID
filter.
type traceTagFilterIterator struct {
sourceIterator iter.Iterator[model.TraceResult]
tagFilter logical.TagFilter
schema logical.Schema
+ spanIDFilter *spanIDFilter
err error
}
@@ -262,6 +268,11 @@ func (tfti *traceTagFilterIterator) Next()
(model.TraceResult, bool) {
// Check each row to see if any matches the filter
for rowIdx := 0; rowIdx < maxRows; rowIdx++ {
+ // Skip this row if spanID filter exists and doesn't
match
+ if tfti.spanIDFilter != nil &&
!tfti.spanIDFilter.matchSpanID(result.SpanIDs[rowIdx]) {
+ continue
+ }
+
// Create TagFamilies for this specific row
family := &modelv1.TagFamily{
Name: "",
@@ -285,7 +296,7 @@ func (tfti *traceTagFilterIterator) Next()
(model.TraceResult, bool) {
return model.TraceResult{}, false
}
- // If ANY row matches, return this result
+ // If both spanID and tag filters match, return this
result
if ok {
matched = true
break
@@ -311,3 +322,60 @@ func (t *traceTagFilterPlan) Children() []logical.Plan {
func (t *traceTagFilterPlan) Schema() logical.Schema {
return t.s
}
+
+type spanIDFilter struct {
+ targetSpanIDs []string
+}
+
+func newSpanIDFilter(spanIDs []string) *spanIDFilter {
+ return &spanIDFilter{
+ targetSpanIDs: spanIDs,
+ }
+}
+
+func (sf *spanIDFilter) matchSpanID(spanID string) bool {
+ for _, targetSpanID := range sf.targetSpanIDs {
+ if spanID == targetSpanID {
+ return true
+ }
+ }
+ return false
+}
+
+func buildSpanIDFilter(criteria *modelv1.Criteria, spanIDTagName string)
*spanIDFilter {
+ if criteria == nil || spanIDTagName == "" {
+ return nil
+ }
+
+ var extractSpanIDs func(*modelv1.Criteria) []string
+ extractSpanIDs = func(c *modelv1.Criteria) []string {
+ if c == nil {
+ return nil
+ }
+
+ switch c.GetExp().(type) {
+ case *modelv1.Criteria_Condition:
+ cond := c.GetCondition()
+ if cond.Name == spanIDTagName && cond.Op ==
modelv1.Condition_BINARY_OP_EQ {
+ return extractIDsFromCondition(cond)
+ }
+ case *modelv1.Criteria_Le:
+ le := c.GetLe()
+ var spanIDs []string
+ if le.Left != nil {
+ spanIDs = append(spanIDs,
extractSpanIDs(le.Left)...)
+ }
+ if le.Right != nil {
+ spanIDs = append(spanIDs,
extractSpanIDs(le.Right)...)
+ }
+ return spanIDs
+ }
+ return nil
+ }
+
+ spanIDs := extractSpanIDs(criteria)
+ if len(spanIDs) > 0 {
+ return newSpanIDFilter(spanIDs)
+ }
+ return nil
+}
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 707872bd..ae6e165b 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -357,6 +357,7 @@ type TraceResult struct {
Error error
TID string
Spans [][]byte
+ SpanIDs []string
Tags []Tag
Key int64
GroupIndex int
diff --git a/pkg/test/trace/testdata/traces/sw.json
b/pkg/test/trace/testdata/traces/sw.json
index 37e21ed3..4f501b9a 100644
--- a/pkg/test/trace/testdata/traces/sw.json
+++ b/pkg/test/trace/testdata/traces/sw.json
@@ -28,12 +28,17 @@
"name": "duration",
"type": "TAG_TYPE_INT"
},
+ {
+ "name": "span_id",
+ "type": "TAG_TYPE_STRING"
+ },
{
"name": "timestamp",
"type": "TAG_TYPE_TIMESTAMP"
}
],
"trace_id_tag_name": "trace_id",
+ "span_id_tag_name": "span_id",
"timestamp_tag_name": "timestamp",
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/sw_updated.json
b/pkg/test/trace/testdata/traces/sw_updated.json
index 78c56e63..9d55e73b 100644
--- a/pkg/test/trace/testdata/traces/sw_updated.json
+++ b/pkg/test/trace/testdata/traces/sw_updated.json
@@ -28,6 +28,10 @@
"name": "duration",
"type": "TAG_TYPE_INT"
},
+ {
+ "name": "span_id",
+ "type": "TAG_TYPE_STRING"
+ },
{
"name": "error_message",
"type": "TAG_TYPE_STRING"
@@ -38,6 +42,7 @@
}
],
"trace_id_tag_name": "trace_id",
+ "span_id_tag_name": "span_id",
"timestamp_tag_name": "timestamp",
"updated_at": "2021-04-15T01:30:15.01Z"
}
diff --git a/pkg/test/trace/testdata/traces/zipkin.json
b/pkg/test/trace/testdata/traces/zipkin.json
index d90ee6af..bc832e0b 100644
--- a/pkg/test/trace/testdata/traces/zipkin.json
+++ b/pkg/test/trace/testdata/traces/zipkin.json
@@ -70,6 +70,7 @@
}
],
"trace_id_tag_name": "trace_id",
+ "span_id_tag_name": "span_id",
"timestamp_tag_name": "timestamp",
"updated_at": "2021-04-15T01:30:15.01Z"
}
\ No newline at end of file
diff --git a/test/cases/trace/data/testdata/sw.json
b/test/cases/trace/data/testdata/sw.json
index 826032dd..88d99890 100644
--- a/test/cases/trace/data/testdata/sw.json
+++ b/test/cases/trace/data/testdata/sw.json
@@ -30,6 +30,11 @@
"int": {
"value": 1000
}
+ },
+ {
+ "str": {
+ "value": "span_001_1"
+ }
}
],
"span": "trace_001_span_1"
@@ -65,6 +70,11 @@
"int": {
"value": 500
}
+ },
+ {
+ "str": {
+ "value": "span_001_2"
+ }
}
],
"span": "trace_001_span_2"
@@ -100,6 +110,11 @@
"int": {
"value": 300
}
+ },
+ {
+ "str": {
+ "value": "span_001_3"
+ }
}
],
"span": "trace_001_span_3"
@@ -135,6 +150,11 @@
"int": {
"value": 800
}
+ },
+ {
+ "str": {
+ "value": "span_002_1"
+ }
}
],
"span": "trace_002_span_1"
@@ -170,6 +190,11 @@
"int": {
"value": 200
}
+ },
+ {
+ "str": {
+ "value": "span_002_2"
+ }
}
],
"span": "trace_002_span_2"
@@ -205,6 +230,11 @@
"int": {
"value": 1200
}
+ },
+ {
+ "str": {
+ "value": "span_003_1"
+ }
}
],
"span": "trace_003_span_1"
@@ -240,6 +270,11 @@
"int": {
"value": 150
}
+ },
+ {
+ "str": {
+ "value": "span_003_2"
+ }
}
],
"span": "trace_003_span_2"
@@ -275,6 +310,11 @@
"int": {
"value": 400
}
+ },
+ {
+ "str": {
+ "value": "span_003_3"
+ }
}
],
"span": "trace_003_span_3"
@@ -310,6 +350,11 @@
"int": {
"value": 600
}
+ },
+ {
+ "str": {
+ "value": "span_004_1"
+ }
}
],
"span": "trace_004_span_1"
@@ -345,6 +390,11 @@
"int": {
"value": 900
}
+ },
+ {
+ "str": {
+ "value": "span_005_1"
+ }
}
],
"span": "trace_005_span_1"
@@ -380,6 +430,11 @@
"int": {
"value": 250
}
+ },
+ {
+ "str": {
+ "value": "span_005_2"
+ }
}
],
"span": "trace_005_span_2"
@@ -415,6 +470,11 @@
"int": {
"value": 350
}
+ },
+ {
+ "str": {
+ "value": "span_005_3"
+ }
}
],
"span": "trace_005_span_3"
@@ -450,6 +510,11 @@
"int": {
"value": 180
}
+ },
+ {
+ "str": {
+ "value": "span_005_4"
+ }
}
],
"span": "trace_005_span_4"
diff --git a/test/cases/trace/data/testdata/sw_mixed_traces.json
b/test/cases/trace/data/testdata/sw_mixed_traces.json
index ddaccdf6..3728fd93 100644
--- a/test/cases/trace/data/testdata/sw_mixed_traces.json
+++ b/test/cases/trace/data/testdata/sw_mixed_traces.json
@@ -30,6 +30,11 @@
"int": {
"value": 750
}
+ },
+ {
+ "str": {
+ "value": "span_001_4"
+ }
}
],
"span": "trace_001_span_4"
@@ -65,6 +70,11 @@
"int": {
"value": 320
}
+ },
+ {
+ "str": {
+ "value": "span_001_5"
+ }
}
],
"span": "trace_001_span_5"
@@ -100,6 +110,11 @@
"int": {
"value": 450
}
+ },
+ {
+ "str": {
+ "value": "span_002_3"
+ }
}
],
"span": "trace_002_span_3"
@@ -135,6 +150,11 @@
"int": {
"value": 180
}
+ },
+ {
+ "str": {
+ "value": "span_002_4"
+ }
}
],
"span": "trace_002_span_4"
@@ -170,6 +190,11 @@
"int": {
"value": 1190
}
+ },
+ {
+ "str": {
+ "value": "span_006_1"
+ }
}
],
"span": "trace_006_span_1"
@@ -205,6 +230,11 @@
"int": {
"value": 280
}
+ },
+ {
+ "str": {
+ "value": "span_006_2"
+ }
}
],
"span": "trace_006_span_2"
@@ -240,6 +270,11 @@
"int": {
"value": 150
}
+ },
+ {
+ "str": {
+ "value": "span_006_3"
+ }
}
],
"span": "trace_006_span_3"
@@ -275,6 +310,11 @@
"int": {
"value": 650
}
+ },
+ {
+ "str": {
+ "value": "span_007_1"
+ }
}
],
"span": "trace_007_span_1"
@@ -310,6 +350,11 @@
"int": {
"value": 420
}
+ },
+ {
+ "str": {
+ "value": "span_007_2"
+ }
}
],
"span": "trace_007_span_2"
@@ -345,6 +390,11 @@
"int": {
"value": 890
}
+ },
+ {
+ "str": {
+ "value": "span_008_1"
+ }
}
],
"span": "trace_008_span_1"
@@ -380,6 +430,11 @@
"int": {
"value": 340
}
+ },
+ {
+ "str": {
+ "value": "span_008_2"
+ }
}
],
"span": "trace_008_span_2"
@@ -415,6 +470,11 @@
"int": {
"value": 2100
}
+ },
+ {
+ "str": {
+ "value": "span_008_3"
+ }
}
],
"span": "trace_008_span_3"
diff --git a/test/cases/trace/data/testdata/sw_updated.json
b/test/cases/trace/data/testdata/sw_updated.json
index b4ce7da9..c323f882 100644
--- a/test/cases/trace/data/testdata/sw_updated.json
+++ b/test/cases/trace/data/testdata/sw_updated.json
@@ -31,6 +31,11 @@
"value": 1234
}
},
+ {
+ "str": {
+ "value": "span_009_1"
+ }
+ },
{
"str": {
"value": ""
@@ -71,6 +76,11 @@
"value": 500
}
},
+ {
+ "str": {
+ "value": "span_009_2"
+ }
+ },
{
"str": {
"value": ""
@@ -111,6 +121,11 @@
"value": 91011
}
},
+ {
+ "str": {
+ "value": "span_010_1"
+ }
+ },
{
"str": {
"value": "Connection timeout"
diff --git a/test/cases/trace/data/want/having_query_tag.yml
b/test/cases/trace/data/want/having_query_tag.yml
index dd3880cc..d17547bd 100644
--- a/test/cases/trace/data/want/having_query_tag.yml
+++ b/test/cases/trace/data/want/having_query_tag.yml
@@ -19,10 +19,6 @@ traces:
- spans:
- span: zipkin_trace_001_span_001
tags:
- - key: span_id
- value:
- str:
- value: span_001
- key: operation_name
value:
str:
@@ -38,12 +34,12 @@ traces:
value:
str:
value: zipkin_trace_001
- - span: zipkin_trace_001_span_002
- tags:
- key: span_id
value:
str:
- value: span_002
+ value: span_001
+ - span: zipkin_trace_001_span_002
+ tags:
- key: operation_name
value:
str:
@@ -58,4 +54,7 @@ traces:
value:
str:
value: zipkin_trace_001
-
\ No newline at end of file
+ - key: span_id
+ value:
+ str:
+ value: span_002
\ No newline at end of file
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 71747793..4df39247 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -26,7 +26,7 @@ SW_ROVER_COMMIT=4c0cb8429a96f190ea30eac1807008d523c749c3
SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3
SW_PREDICTOR_COMMIT=54a0197654a3781a6f73ce35146c712af297c994
-SW_OAP_COMMIT=88aa16feb72252c1bfe16a19412f7ed460349f8b
+SW_OAP_COMMIT=50825c5f77d097683fb83311422bbbe1d7e81ab9
SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=a2a67ca63084cddf82303155c185e3c24cf07eef
SW_CTL_COMMIT=3b675df73824bbb80e6aabf6a95d110feb37b6b1
-SW_TRACE_MOCKER_COMMIT=a2a67ca63084cddf82303155c185e3c24cf07eef
\ No newline at end of file
+SW_TRACE_MOCKER_COMMIT=a2a67ca63084cddf82303155c185e3c24cf07eef
diff --git a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
index 00c2f4ad..2460e129 100644
--- a/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
+++ b/test/stress/stream-vs-trace/testdata/schema/trace_schema.json
@@ -46,5 +46,6 @@
}
],
"trace_id_tag_name": "traceId",
+ "span_id_tag_name": "spanId",
"timestamp_tag_name": "startTime"
}