This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit f42535ef6fb7bd41a76b25e00e6cfa190fa4a079
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Aug 31 07:34:26 2025 +0800

    Enhance trace querying by integrating entity handling and series ID mapping.
---
 banyand/trace/query.go                           | 36 ++++++++++++++++++++++--
 banyand/trace/trace.go                           | 11 +++++---
 pkg/query/logical/trace/trace_plan_local.go      |  3 ++
 pkg/query/logical/trace/trace_plan_tag_filter.go | 18 ++++++++----
 pkg/query/model/model.go                         |  3 ++
 5 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 07b70afd..a0c328fc 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -26,6 +26,7 @@ import (
        "github.com/pkg/errors"
        "google.golang.org/protobuf/types/known/timestamppb"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
@@ -40,7 +41,8 @@ const checkDoneEvery = 128
 var nilResult = model.TraceQueryResult(nil)
 
 type queryOptions struct {
-       traceIDs []string
+       seriesToEntity map[common.SeriesID][]*modelv1.TagValue
+       traceIDs       []string
        model.TraceQueryOptions
        minTimestamp int64
        maxTimestamp int64
@@ -48,6 +50,7 @@ type queryOptions struct {
 
 func (qo *queryOptions) reset() {
        qo.TraceQueryOptions.Reset()
+       qo.seriesToEntity = nil
        qo.traceIDs = nil
        qo.minTimestamp = 0
        qo.maxTimestamp = 0
@@ -55,6 +58,7 @@ func (qo *queryOptions) reset() {
 
 func (qo *queryOptions) copyFrom(other *queryOptions) {
        qo.TraceQueryOptions.CopyFrom(&other.TraceQueryOptions)
+       qo.seriesToEntity = other.seriesToEntity
        qo.traceIDs = make([]string, len(other.traceIDs))
        copy(qo.traceIDs, other.traceIDs)
        qo.minTimestamp = other.minTimestamp
@@ -104,6 +108,30 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
                minTimestamp:      tqo.TimeRange.Start.UnixNano(),
                maxTimestamp:      tqo.TimeRange.End.UnixNano(),
        }
+
+       // Process entities to get series IDs for sidx queries
+       if len(tqo.Entities) > 0 {
+               // Create series from entities for lookup
+               series := make([]*pbv1.Series, len(tqo.Entities))
+               for i, entityValues := range tqo.Entities {
+                       series[i] = &pbv1.Series{
+                               Subject:      tqo.Name,
+                               EntityValues: entityValues,
+                       }
+               }
+
+               // Use segment lookup to find actual series that exist in the 
data
+               qo.seriesToEntity = 
make(map[common.SeriesID][]*modelv1.TagValue)
+               for _, segment := range segments {
+                       sl, lookupErr := segment.Lookup(ctx, series)
+                       if lookupErr != nil {
+                               return nil, fmt.Errorf("cannot lookup series: 
%w", lookupErr)
+                       }
+                       for _, s := range sl {
+                               qo.seriesToEntity[s.ID] = s.EntityValues
+                       }
+               }
+       }
        var n int
        tables := make([]*tsTable, 0)
        for _, segment := range segments {
@@ -129,7 +157,11 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
 
                if len(sidxInstances) > 0 {
                        // Query sidx for trace IDs
-                       traceIDs, sidxErr := t.querySidxForTraceIDs(ctx, 
sidxInstances, tqo)
+                       var seriesIDs []common.SeriesID
+                       for seriesID := range qo.seriesToEntity {
+                               seriesIDs = append(seriesIDs, seriesID)
+                       }
+                       traceIDs, sidxErr := t.querySidxForTraceIDs(ctx, 
sidxInstances, tqo, seriesIDs)
                        if sidxErr != nil {
                                t.l.Warn().Err(sidxErr).Str("sidx", 
sidxName).Msg("sidx query failed, falling back to normal query")
                        } else if len(traceIDs) > 0 {
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index 7e7cc594..dfa552d9 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -137,7 +137,7 @@ func (t *trace) parseSpec() {
 }
 
 // querySidxForTraceIDs queries sidx instances to get ordered trace IDs.
-func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances 
[]sidx.SIDX, tqo model.TraceQueryOptions) ([]string, error) {
+func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances 
[]sidx.SIDX, tqo model.TraceQueryOptions, seriesIDs []common.SeriesID) 
([]string, error) {
        // Convert TraceQueryOptions to sidx.QueryRequest
        req := sidx.QueryRequest{
                Filter:         tqo.SkippingFilter,
@@ -158,9 +158,12 @@ func (t *trace) querySidxForTraceIDs(ctx context.Context, 
sidxInstances []sidx.S
                req.MaxKey = &maxKey
        }
 
-       // For now, use all series IDs (this could be optimized further)
-       // TODO: Consider filtering by relevant series IDs based on query 
context
-       req.SeriesIDs = []common.SeriesID{1} // Placeholder - should be 
dynamically determined
+       // Use the provided series IDs for targeted querying
+       if len(seriesIDs) > 0 {
+               req.SeriesIDs = seriesIDs
+       } else {
+               req.SeriesIDs = []common.SeriesID{1}
+       }
 
        // Query multiple sidx instances
        response, err := sidx.QueryMultipleSIDX(ctx, sidxInstances, req)
diff --git a/pkg/query/logical/trace/trace_plan_local.go 
b/pkg/query/logical/trace/trace_plan_local.go
index d73af754..71aebf07 100644
--- a/pkg/query/logical/trace/trace_plan_local.go
+++ b/pkg/query/logical/trace/trace_plan_local.go
@@ -22,6 +22,7 @@ import (
        "fmt"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/iter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -49,6 +50,7 @@ type localScan struct {
        projectionTags    *model.TagProjection
        timeRange         timestamp.TimeRange
        projectionTagRefs [][]*logical.TagRef
+       entities          [][]*modelv1.TagValue
        traceIDs          []string
        maxTraceSize      int
 }
@@ -90,6 +92,7 @@ func (i *localScan) Execute(ctx context.Context) 
(iter.Iterator[model.TraceResul
                        SkippingFilter: i.skippingFilter,
                        Order:          orderBy,
                        TagProjection:  i.projectionTags,
+                       Entities:       i.entities,
                        MaxTraceSize:   i.maxTraceSize,
                        TraceIDs:       i.traceIDs,
                }); err != nil {
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go 
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index f868d49a..8bb3aeca 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -55,11 +55,13 @@ func (uis *unresolvedTraceTagFilter) Analyze(s 
logical.Schema) (logical.Plan, er
        var err error
        var conditionTagNames []string
        var traceIDs []string
-       // For trace, we only use skipping filter (no inverted filter or 
entities)
-       ctx.skippingFilter, conditionTagNames, traceIDs, err = 
buildTraceFilter(uis.criteria, s, entityDict, uis.traceIDTagName)
+       var entities [][]*modelv1.TagValue
+       // For trace, we use skipping filter and capture entities for query 
optimization
+       ctx.skippingFilter, entities, conditionTagNames, traceIDs, err = 
buildTraceFilter(uis.criteria, s, entityDict, uis.traceIDTagName)
        if err != nil {
                return nil, err
        }
+       ctx.entities = entities
 
        // Initialize projectionTags even if no explicit projection tags are 
provided
        ctx.projectionTags = &model.TagProjection{
@@ -117,6 +119,7 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx 
*traceAnalyzeContext
                projectionTags:    ctx.projectionTags,
                metadata:          uis.metadata,
                skippingFilter:    ctx.skippingFilter,
+               entities:          ctx.entities,
                l:                 logger.GetLogger("query", "trace", 
"local-scan"),
                ec:                ec,
                traceIDs:          traceIDs,
@@ -128,6 +131,7 @@ type traceAnalyzeContext struct {
        skippingFilter index.Filter
        projectionTags *model.TagProjection
        projTagsRefs   [][]*logical.TagRef
+       entities       [][]*modelv1.TagValue
 }
 
 func newTraceAnalyzerContext(s logical.Schema) *traceAnalyzeContext {
@@ -151,9 +155,11 @@ func deduplicateStrings(strings []string) []string {
 
 // buildTraceFilter builds a filter for trace queries and returns both the 
filter and collected tag names.
 // Unlike stream, trace only needs skipping filter.
-func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict 
map[string]int, traceIDTagName string) (index.Filter, []string, []string, 
error) {
+func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict 
map[string]int,
+       traceIDTagName string,
+) (index.Filter, [][]*modelv1.TagValue, []string, []string, error) {
        if criteria == nil {
-               return nil, nil, nil, nil
+               return nil, nil, nil, nil, nil
        }
        // Create a map of valid tag names from the schema
        tagNames := make(map[string]bool)
@@ -162,8 +168,8 @@ func buildTraceFilter(criteria *modelv1.Criteria, s 
logical.Schema, entityDict m
                tagNames[tagName] = true
        }
 
-       filter, _, collectedTagNames, traceIDs, err := buildFilter(criteria, 
tagNames, entityDict, nil, traceIDTagName)
-       return filter, collectedTagNames, traceIDs, err
+       filter, entities, collectedTagNames, traceIDs, err := 
buildFilter(criteria, tagNames, entityDict, nil, traceIDTagName)
+       return filter, entities, collectedTagNames, traceIDs, err
 }
 
 var (
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index dda510ec..6a504bc1 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -322,6 +322,7 @@ type TraceQueryOptions struct {
        Order          *index.OrderBy
        TagProjection  *TagProjection
        Name           string
+       Entities       [][]*modelv1.TagValue
        TraceIDs       []string
        MaxTraceSize   int
 }
@@ -333,6 +334,7 @@ func (t *TraceQueryOptions) Reset() {
        t.SkippingFilter = nil
        t.Order = nil
        t.TagProjection = nil
+       t.Entities = nil
        t.TraceIDs = nil
        t.MaxTraceSize = 0
 }
@@ -344,6 +346,7 @@ func (t *TraceQueryOptions) CopyFrom(other 
*TraceQueryOptions) {
        t.SkippingFilter = other.SkippingFilter
        t.Order = other.Order
        t.TagProjection = other.TagProjection
+       t.Entities = other.Entities
        t.MaxTraceSize = other.MaxTraceSize
 }
 

Reply via email to