This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 065d7df8 Add the measure query trace system (#476)
065d7df8 is described below

commit 065d7df820ca240c4c91bab7cb614d5d9693c643
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jun 26 07:00:45 2024 +0800

    Add the measure query trace system (#476)
---
 CHANGES.md                                         |   1 +
 api/proto/banyandb/common/v1/trace.proto           |   2 +
 banyand/dquery/dquery.go                           |  15 +-
 banyand/dquery/measure.go                          |  60 ++++++--
 banyand/internal/storage/index.go                  |  85 +++++++++--
 banyand/liaison/grpc/measure.go                    |  37 +++--
 banyand/measure/introducer.go                      |  15 +-
 banyand/measure/query.go                           |  90 ++++++------
 banyand/measure/trace.go                           |  74 ++++++++++
 banyand/metadata/client.go                         |   6 +-
 banyand/metadata/embeddedserver/server.go          |   6 +-
 banyand/metadata/metadata_test.go                  |  14 +-
 banyand/query/processor.go                         |  85 +++++++++--
 banyand/stream/introducer.go                       |  15 +-
 docs/api-reference.md                              |   1 +
 pkg/query/doc.go                                   |  19 +++
 pkg/query/logical/index_filter.go                  |  24 +++-
 .../logical/measure/measure_plan_distributed.go    |  37 +++--
 .../measure/measure_plan_indexscan_local.go        |  35 ++++-
 pkg/query/tracer.go                                | 155 +++++++++++++++++++++
 pkg/query/tracer_test.go                           | 131 +++++++++++++++++
 pkg/test/stream/etcd.go                            |   3 +
 test/stress/trace/docker-compose-cluster.yaml      |   6 +-
 23 files changed, 791 insertions(+), 125 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 13a0e2b6..82ca6e08 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 
 - Check unregistered nodes in background.
 - Improve sorting performance of stream.
+- Add the measure query trace.
 
 ### Bugs
 
diff --git a/api/proto/banyandb/common/v1/trace.proto 
b/api/proto/banyandb/common/v1/trace.proto
index 73a4351d..b2dfc870 100644
--- a/api/proto/banyandb/common/v1/trace.proto
+++ b/api/proto/banyandb/common/v1/trace.proto
@@ -48,6 +48,8 @@ message Span {
   string message = 5;
   // children is a list of child spans of the span.
   repeated Span children = 6;
+  // duration is the duration of the span.
+  int64 duration = 7;
 }
 
 // Tag is the key-value pair of a span.
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index c7e1454d..d378febb 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -20,9 +20,11 @@ package dquery
 
 import (
        "context"
+       "errors"
 
        "go.uber.org/multierr"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
@@ -42,13 +44,14 @@ const (
 var _ run.Service = (*queryService)(nil)
 
 type queryService struct {
-       log         *logger.Logger
        metaService metadata.Repo
+       pipeline    queue.Server
+       log         *logger.Logger
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
        tqp         *topNQueryProcessor
        closer      *run.Closer
-       pipeline    queue.Server
+       nodeID      string
 }
 
 // NewService return a new query service.
@@ -78,7 +81,13 @@ func (q *queryService) Name() string {
        return moduleName
 }
 
-func (q *queryService) PreRun(_ context.Context) error {
+func (q *queryService) PreRun(ctx context.Context) error {
+       val := ctx.Value(common.ContextNodeKey)
+       if val == nil {
+               return errors.New("node id is empty")
+       }
+       node := val.(common.Node)
+       q.nodeID = node.NodeID
        q.log = logger.GetLogger(moduleName)
        q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
        q.mqp.measureService = measure.NewPortableRepository(q.metaService, 
q.log)
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 5f1c515a..be6a81d5 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -19,6 +19,8 @@ package dquery
 
 import (
        "context"
+       "errors"
+       "fmt"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -27,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
 )
@@ -39,7 +42,8 @@ type measureQueryProcessor struct {
 
 func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
        queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
-       now := time.Now().UnixNano()
+       n := time.Now()
+       now := n.UnixNano()
        if !ok {
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
                return
@@ -79,8 +83,28 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("query plan")
        }
-
-       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(),
 &distributedContext{
+       ctx := context.Background()
+       var tracer *query.Tracer
+       var span *query.Span
+       if queryCriteria.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "distributed-%s", 
p.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *measurev1.QueryResponse:
+                               d.Trace = tracer.ToProto()
+                       case common.Error:
+                               span.Error(errors.New(d.Msg()))
+                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
+       }
+       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(ctx,
 &distributedContext{
                Broadcaster: p.broadcaster,
                timeRange:   queryCriteria.TimeRange,
        }))
@@ -92,18 +116,34 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        defer func() {
                if err = mIterator.Close(); err != nil {
                        ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+                       if span != nil {
+                               span.Error(fmt.Errorf("fail to close the query 
plan: %w", err))
+                       }
                }
        }()
        result := make([]*measurev1.DataPoint, 0)
-       for mIterator.Next() {
-               current := mIterator.Current()
-               if len(current) > 0 {
-                       result = append(result, current[0])
+       func() {
+               var r int
+               if tracer != nil {
+                       iterSpan, _ := tracer.StartSpan(ctx, "iterator")
+                       defer func() {
+                               iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
+                               iterSpan.Tag("size", fmt.Sprintf("%d", 
len(result)))
+                               iterSpan.Stop()
+                       }()
                }
-       }
+               for mIterator.Next() {
+                       r++
+                       current := mIterator.Current()
+                       if len(current) > 0 {
+                               result = append(result, current[0])
+                       }
+               }
+       }()
+       qr := &measurev1.QueryResponse{DataPoints: result}
        if e := ml.Debug(); e.Enabled() {
-               e.RawJSON("ret", 
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
+               e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
        }
-       resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{DataPoints: result})
+       resp = bus.NewMessage(bus.MessageID(now), qr)
        return
 }
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 4dcac9d5..cb926e0e 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -38,6 +38,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -88,15 +90,26 @@ func (s *seriesIndex) Write(docs index.Documents) error {
 
 var rangeOpts = index.RangeOpts{}
 
-func (s *seriesIndex) searchPrimary(ctx context.Context, series 
[]*pbv1.Series) (pbv1.SeriesList, error) {
+func (s *seriesIndex) searchPrimary(ctx context.Context, series 
[]*pbv1.Series) (sl pbv1.SeriesList, err error) {
        seriesMatchers := make([]index.SeriesMatcher, len(series))
        for i := range series {
-               var err error
                seriesMatchers[i], err = 
convertEntityValuesToSeriesMatcher(series[i])
                if err != nil {
                        return nil, err
                }
        }
+       tracer := query.GetTracer(ctx)
+       var span *query.Span
+       if tracer != nil {
+               span, _ = tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
+               span.Tagf("matchers", "%v", seriesMatchers)
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       }
+                       span.Stop()
+               }()
+       }
        ss, err := s.store.Search(ctx, seriesMatchers)
        if err != nil {
                return nil, err
@@ -105,6 +118,9 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, 
series []*pbv1.Series)
        if err != nil {
                return nil, errors.WithMessagef(err, "failed to convert index 
series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss))
        }
+       if span != nil {
+               span.Tagf("matched", "%d", len(result))
+       }
        return result, nil
 }
 
@@ -174,28 +190,54 @@ func convertIndexSeriesToSeriesList(indexSeries 
[]index.Series) (pbv1.SeriesList
        return seriesList, nil
 }
 
-func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, 
filter index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, 
error) {
+func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, 
filter index.Filter, order *pbv1.OrderBy, preloadSize int) (sl pbv1.SeriesList, 
err error) {
+       tracer := query.GetTracer(ctx)
+       if tracer != nil {
+               var span *query.Span
+               span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search")
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       }
+                       span.Stop()
+               }()
+       }
        seriesList, err := s.searchPrimary(ctx, series)
        if err != nil {
                return nil, err
        }
 
        pl := seriesList.ToList()
-       if filter != nil {
+       if filter != nil && filter != logical.ENode {
                var plFilter posting.List
                // TODO: merge searchPrimary and filter
-               plFilter, err = filter.Execute(func(_ 
databasev1.IndexRule_Type) (index.Searcher, error) {
-                       return s.store, nil
-               }, 0)
+               func() {
+                       if tracer != nil {
+                               span, _ := tracer.StartSpan(ctx, "filter")
+                               span.Tag("exp", filter.String())
+                               defer func() {
+                                       if err != nil {
+                                               span.Error(err)
+                                       } else {
+                                               span.Tagf("matched", "%d", 
plFilter.Len())
+                                               span.Tagf("total", "%d", 
pl.Len())
+                                       }
+                                       span.Stop()
+                               }()
+                       }
+                       if plFilter, err = filter.Execute(func(_ 
databasev1.IndexRule_Type) (index.Searcher, error) {
+                               return s.store, nil
+                       }, 0); err != nil {
+                               return
+                       }
+                       if plFilter == nil {
+                               return
+                       }
+                       err = pl.Intersect(plFilter)
+               }()
                if err != nil {
                        return nil, err
                }
-               if plFilter == nil {
-                       return pbv1.SeriesList{}, nil
-               }
-               if err = pl.Intersect(plFilter); err != nil {
-                       return nil, err
-               }
        }
 
        if order == nil || order.Index == nil {
@@ -205,6 +247,17 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
        fieldKey := index.FieldKey{
                IndexRuleID: order.Index.GetMetadata().Id,
        }
+       var span *query.Span
+       if tracer != nil {
+               span, _ = tracer.StartSpan(ctx, "sort")
+               span.Tagf("preload", "%d", preloadSize)
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       }
+                       span.Stop()
+               }()
+       }
        // TODO:// merge searchPrimary and sort
        iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, 
preloadSize)
        if err != nil {
@@ -215,7 +268,9 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
        }()
 
        var sortedSeriesList pbv1.SeriesList
+       var r int
        for iter.Next() {
+               r++
                docID := iter.Val().DocID
                if !pl.Contains(docID) {
                        continue
@@ -225,6 +280,10 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
                        return nil, err
                }
        }
+       if span != nil {
+               span.Tagf("rounds", "%d", r)
+               span.Tagf("size", "%d", len(sortedSeriesList))
+       }
        return sortedSeriesList, err
 }
 
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 6b480738..175c687c 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -34,8 +34,10 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/accesslog"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -145,21 +147,36 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
 
 var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: 
make([]*measurev1.DataPoint, 0)}
 
-func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest) (*measurev1.QueryResponse, error) {
-       if err := timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
+func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+       if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
-       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
-       feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery, 
message)
-       if errQuery != nil {
-               return nil, errQuery
+       now := time.Now()
+       if req.Trace {
+               ctx := context.TODO()
+               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ := tracer.StartSpan(ctx, "measure-grpc")
+               span.Tag("request", convert.BytesToString(logger.Proto(req)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.AddSubTrace(resp.Trace)
+                               resp.Trace = tracer.ToProto()
+                       }
+                       span.Stop()
+               }()
        }
-       msg, errFeat := feat.Get()
-       if errFeat != nil {
-               if errors.Is(errFeat, io.EOF) {
+       feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(now.UnixNano()), req))
+       if err != nil {
+               return nil, err
+       }
+       msg, err := feat.Get()
+       if err != nil {
+               if errors.Is(err, io.EOF) {
                        return emptyMeasureQueryResponse, nil
                }
-               return nil, errFeat
+               return nil, err
        }
        data := msg.Data()
        switch d := data.(type) {
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index 8ba0118a..7fce11d1 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -40,11 +40,12 @@ func generateIntroduction() *introduction {
        if v == nil {
                return &introduction{}
        }
-       return v.(*introduction)
+       i := v.(*introduction)
+       i.reset()
+       return i
 }
 
 func releaseIntroduction(i *introduction) {
-       i.reset()
        introductionPool.Put(i)
 }
 
@@ -69,11 +70,12 @@ func generateFlusherIntroduction() *flusherIntroduction {
                        flushed: make(map[uint64]*partWrapper),
                }
        }
-       return v.(*flusherIntroduction)
+       i := v.(*flusherIntroduction)
+       i.reset()
+       return i
 }
 
 func releaseFlusherIntroduction(i *flusherIntroduction) {
-       i.reset()
        flusherIntroductionPool.Put(i)
 }
 
@@ -100,11 +102,12 @@ func generateMergerIntroduction() *mergerIntroduction {
        if v == nil {
                return &mergerIntroduction{}
        }
-       return v.(*mergerIntroduction)
+       i := v.(*mergerIntroduction)
+       i.reset()
+       return i
 }
 
 func releaseMergerIntroduction(i *mergerIntroduction) {
-       i.reset()
        mergerIntroductionPool.Put(i)
 }
 
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 168a5663..c153a301 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -63,17 +63,16 @@ type queryOptions struct {
        maxTimestamp int64
 }
 
-func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) 
(pbv1.MeasureQueryResult, error) {
+func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) 
(mqr pbv1.MeasureQueryResult, err error) {
        if mqo.TimeRange == nil || len(mqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
        }
        if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
                return nil, errors.New("invalid query options: tagProjection or 
fieldProjection is required")
        }
-       var result queryResult
        db := s.databaseSupplier.SupplyTSDB()
        if db == nil {
-               return &result, nil
+               return mqr, nil
        }
        tsdb := db.(storage.TSDB[*tsTable, option])
        tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
@@ -95,7 +94,7 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                return nil, err
        }
        if len(sl) < 1 {
-               return &result, nil
+               return mqr, nil
        }
        var sids []common.SeriesID
        for i := range sl {
@@ -108,6 +107,7 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                maxTimestamp:        mqo.TimeRange.End.UnixNano(),
        }
        var n int
+       var result queryResult
        for i := range tabWrappers {
                s := tabWrappers[i].Table().currentSnapshot()
                if s == nil {
@@ -120,48 +120,58 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                }
                result.snapshots = append(result.snapshots, s)
        }
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
-       // TODO: cache tstIter
-       var tstIter tstIter
-       defer tstIter.reset()
-       originalSids := make([]common.SeriesID, len(sids))
-       copy(originalSids, sids)
-       sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
-       tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
-       if tstIter.Error() != nil {
-               return nil, fmt.Errorf("cannot init tstIter: %w", 
tstIter.Error())
-       }
-       projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, 
&result)
-       result.tagProjection = qo.TagProjection
-       qo.TagProjection = tagProjectionOnPart
-       for tstIter.nextBlock() {
-               bc := generateBlockCursor()
-               p := tstIter.piHeap[0]
-
-               seriesID := p.curBlock.seriesID
-               if result.entityValues != nil && result.entityValues[seriesID] 
== nil {
-                       for i := range sl {
-                               if sl[i].ID == seriesID {
-                                       tag := 
make(map[string]*modelv1.TagValue)
-                                       for name, offset := range 
projectedEntityOffsets {
-                                               tag[name] = 
sl[i].EntityValues[offset]
+
+       func() {
+               bma := generateBlockMetadataArray()
+               defer releaseBlockMetadataArray(bma)
+               defFn := startBlockScanSpan(ctx, len(sids), parts, &result)
+               defer defFn()
+               // TODO: cache tstIter
+               var tstIter tstIter
+               defer tstIter.reset()
+               originalSids := make([]common.SeriesID, len(sids))
+               copy(originalSids, sids)
+               sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] 
})
+               tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+               if tstIter.Error() != nil {
+                       err = fmt.Errorf("cannot init tstIter: %w", 
tstIter.Error())
+                       return
+               }
+               projectedEntityOffsets, tagProjectionOnPart := 
s.parseTagProjection(qo, &result)
+               result.tagProjection = qo.TagProjection
+               qo.TagProjection = tagProjectionOnPart
+
+               for tstIter.nextBlock() {
+                       bc := generateBlockCursor()
+                       p := tstIter.piHeap[0]
+
+                       seriesID := p.curBlock.seriesID
+                       if result.entityValues != nil && 
result.entityValues[seriesID] == nil {
+                               for i := range sl {
+                                       if sl[i].ID == seriesID {
+                                               tag := 
make(map[string]*modelv1.TagValue)
+                                               for name, offset := range 
projectedEntityOffsets {
+                                                       tag[name] = 
sl[i].EntityValues[offset]
+                                               }
+                                               result.entityValues[seriesID] = 
tag
                                        }
-                                       result.entityValues[seriesID] = tag
                                }
                        }
+                       bc.init(p.p, p.curBlock, qo)
+                       result.data = append(result.data, bc)
                }
-               bc.init(p.p, p.curBlock, qo)
-               result.data = append(result.data, bc)
-       }
-       if tstIter.Error() != nil {
-               return nil, fmt.Errorf("cannot iterate tstIter: %w", 
tstIter.Error())
+               if tstIter.Error() != nil {
+                       err = fmt.Errorf("cannot iterate tstIter: %w", 
tstIter.Error())
+               }
+               result.sidToIndex = make(map[common.SeriesID]int)
+               for i, si := range originalSids {
+                       result.sidToIndex[si] = i
+               }
+       }()
+       if err != nil {
+               return nil, err
        }
 
-       result.sidToIndex = make(map[common.SeriesID]int)
-       for i, si := range originalSids {
-               result.sidToIndex[si] = i
-       }
        if mqo.Order == nil {
                result.ascTS = true
        } else if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
diff --git a/banyand/measure/trace.go b/banyand/measure/trace.go
new file mode 100644
index 00000000..ccf51f3a
--- /dev/null
+++ b/banyand/measure/trace.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/dustin/go-humanize"
+
+       "github.com/apache/skywalking-banyandb/pkg/query"
+)
+
+const (
+       partMetadataHeader = "MinTimestamp, MaxTimestamp, CompressionSize, 
UncompressedSize, TotalCount, BlocksCount"
+       blockHeader        = "PartID, SeriesID, MinTimestamp, MaxTimestamp, 
Count, UncompressedSize"
+)
+
+func (pm *partMetadata) String() string {
+       minTimestamp := time.Unix(0, pm.MinTimestamp).Format(time.Stamp)
+       maxTimestamp := time.Unix(0, pm.MaxTimestamp).Format(time.Stamp)
+
+       return fmt.Sprintf("%s, %s, %s, %s, %s, %s",
+               minTimestamp, maxTimestamp, 
humanize.Bytes(pm.CompressedSizeBytes),
+               humanize.Bytes(pm.UncompressedSizeBytes), 
humanize.Comma(int64(pm.TotalCount)),
+               humanize.Comma(int64(pm.BlocksCount)))
+}
+
+func (bc *blockCursor) String() string {
+       minTimestamp := time.Unix(0, bc.minTimestamp).Format(time.Stamp)
+       maxTimestamp := time.Unix(0, bc.maxTimestamp).Format(time.Stamp)
+
+       return fmt.Sprintf("%d, %d, %s, %s, %d, %s",
+               bc.p.partMetadata.ID, bc.bm.seriesID, minTimestamp, 
maxTimestamp, bc.bm.count, humanize.Bytes(bc.bm.uncompressedSizeBytes))
+}
+
+func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr 
*queryResult) func() {
+       tracer := query.GetTracer(ctx)
+       if tracer == nil {
+               return func() {}
+       }
+
+       span, _ := tracer.StartSpan(ctx, "scan-blocks")
+       span.Tag("series_num", fmt.Sprintf("%d", sids))
+       span.Tag("part_header", partMetadataHeader)
+       for i := range parts {
+               span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID, 
parts[i].path),
+                       parts[i].partMetadata.String())
+       }
+
+       return func() {
+               span.Tag("block_header", blockHeader)
+               for i := range qr.data {
+                       span.Tag(fmt.Sprintf("block_%d", i), 
qr.data[i].String())
+               }
+               span.Stop()
+       }
+}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index c60ef7d0..61ba8164 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -150,8 +150,10 @@ func (s *clientService) Serve() run.StopNotify {
 func (s *clientService) GracefulStop() {
        s.closer.Done()
        s.closer.CloseThenWait()
-       if err := s.schemaRegistry.Close(); err != nil {
-               logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to 
close schema registry")
+       if s.schemaRegistry != nil {
+               if err := s.schemaRegistry.Close(); err != nil {
+                       logger.GetLogger(s.Name()).Error().Err(err).Msg("failed 
to close schema registry")
+               }
        }
 }
 
diff --git a/banyand/metadata/embeddedserver/server.go 
b/banyand/metadata/embeddedserver/server.go
index 3fbe9f6c..44bd4864 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -86,8 +86,10 @@ func (s *server) Serve() run.StopNotify {
 
 func (s *server) GracefulStop() {
        s.Service.GracefulStop()
-       s.metaServer.Close()
-       <-s.metaServer.StopNotify()
+       if s.metaServer != nil {
+               s.metaServer.Close()
+               <-s.metaServer.StopNotify()
+       }
 }
 
 // NewService returns a new metadata repository Service.
diff --git a/banyand/metadata/metadata_test.go 
b/banyand/metadata/metadata_test.go
index 09a5bbaf..981ab416 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -22,6 +22,7 @@ import (
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,29 +40,30 @@ func Test_service_RulesBySubject(t *testing.T) {
                subject *commonv1.Metadata
        }
        is := assert.New(t)
+       req := require.New(t)
        is.NoError(logger.Init(logger.Logging{
                Env:   "dev",
                Level: flags.LogLevel,
        }))
        ctx := context.TODO()
        s, _ := embeddedserver.NewService(ctx)
-       is.NotNil(s)
+       req.NotNil(s)
        rootDir, deferFn, err := testhelper.NewSpace()
-       is.NoError(err)
+       req.NoError(err)
        err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir})
-       is.NoError(err)
-       is.NoError(s.Validate())
+       req.NoError(err)
+       req.NoError(s.Validate())
        ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{NodeID: 
"test"})
        ctx = context.WithValue(ctx, common.ContextNodeRolesKey, 
[]databasev1.Role{databasev1.Role_ROLE_META})
        err = s.PreRun(ctx)
-       is.NoError(err)
+       req.NoError(err)
        defer func() {
                s.GracefulStop()
                deferFn()
        }()
 
        err = test.PreloadSchema(ctx, s.SchemaRegistry())
-       is.NoError(err)
+       req.NoError(err)
 
        tests := []struct {
                name    string
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index ccb039e9..5a361525 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -19,6 +19,9 @@ package query
 
 import (
        "context"
+       "errors"
+       "fmt"
+       "runtime/debug"
        "time"
 
        "go.uber.org/multierr"
@@ -34,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
        logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
@@ -52,13 +56,13 @@ var (
 )
 
 type queryService struct {
-       log *logger.Logger
-       // TODO: remove the metaService once 
https://github.com/apache/skywalking/issues/10121 is fixed.
        metaService metadata.Repo
        pipeline    queue.Server
+       log         *logger.Logger
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
        tqp         *topNQueryProcessor
+       nodeID      string
 }
 
 type streamQueryProcessor struct {
@@ -76,6 +80,12 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if p.log.Debug().Enabled() {
                p.log.Debug().RawJSON("criteria", 
logger.Proto(queryCriteria)).Msg("received a query request")
        }
+       defer func() {
+               if err := recover(); err != nil {
+                       p.log.Error().Interface("err", err).RawJSON("req", 
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+                       resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+               }
+       }()
        // TODO: support multiple groups
        if len(queryCriteria.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
@@ -124,11 +134,18 @@ type measureQueryProcessor struct {
 
 func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
        queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
-       now := time.Now().UnixNano()
+       n := time.Now()
+       now := n.UnixNano()
        if !ok {
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
                return
        }
+       defer func() {
+               if err := recover(); err != nil {
+                       p.log.Error().Interface("err", err).RawJSON("req", 
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+                       resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+               }
+       }()
        // TODO: support multiple groups
        if len(queryCriteria.Groups) > 1 {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("only 
support one group in the query request"))
@@ -160,12 +177,33 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for measure %s: %v", meta.GetName(), err))
                return
        }
+       ctx := context.Background()
+       var tracer *query.Tracer
+       var span *query.Span
+       if queryCriteria.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "data-%s", 
p.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *measurev1.QueryResponse:
+                               d.Trace = tracer.ToProto()
+                       case common.Error:
+                               span.Error(errors.New(d.Msg()))
+                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
+       }
 
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("query plan")
        }
 
-       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(),
 ec))
+       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx,
 ec))
        if err != nil {
                ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", meta.GetName(), err))
@@ -174,19 +212,36 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        defer func() {
                if err = mIterator.Close(); err != nil {
                        ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+                       if span != nil {
+                               span.Error(fmt.Errorf("fail to close the query 
plan: %w", err))
+                       }
                }
        }()
+
        result := make([]*measurev1.DataPoint, 0)
-       for mIterator.Next() {
-               current := mIterator.Current()
-               if len(current) > 0 {
-                       result = append(result, current[0])
+       func() {
+               var r int
+               if tracer != nil {
+                       iterSpan, _ := tracer.StartSpan(ctx, "iterator")
+                       defer func() {
+                               iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
+                               iterSpan.Tag("size", fmt.Sprintf("%d", 
len(result)))
+                               iterSpan.Stop()
+                       }()
                }
-       }
+               for mIterator.Next() {
+                       r++
+                       current := mIterator.Current()
+                       if len(current) > 0 {
+                               result = append(result, current[0])
+                       }
+               }
+       }()
+       qr := &measurev1.QueryResponse{DataPoints: result}
        if e := ml.Debug(); e.Enabled() {
-               e.RawJSON("ret", 
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
+               e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
        }
-       resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{DataPoints: result})
+       resp = bus.NewMessage(bus.MessageID(now), qr)
        return
 }
 
@@ -194,7 +249,13 @@ func (q *queryService) Name() string {
        return moduleName
 }
 
-func (q *queryService) PreRun(_ context.Context) error {
+func (q *queryService) PreRun(ctx context.Context) error {
+       val := ctx.Value(common.ContextNodeKey)
+       if val == nil {
+               return errors.New("node id is empty")
+       }
+       node := val.(common.Node)
+       q.nodeID = node.NodeID
        q.log = logger.GetLogger(moduleName)
        return multierr.Combine(
                q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index cd215263..76c6e659 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -40,11 +40,12 @@ func generateIntroduction() *introduction {
        if v == nil {
                return &introduction{}
        }
-       return v.(*introduction)
+       intro := v.(*introduction)
+       intro.reset()
+       return intro
 }
 
 func releaseIntroduction(i *introduction) {
-       i.reset()
        introductionPool.Put(i)
 }
 
@@ -69,11 +70,12 @@ func generateFlusherIntroduction() *flusherIntroduction {
                        flushed: make(map[uint64]*partWrapper),
                }
        }
-       return v.(*flusherIntroduction)
+       fi := v.(*flusherIntroduction)
+       fi.reset()
+       return fi
 }
 
 func releaseFlusherIntroduction(i *flusherIntroduction) {
-       i.reset()
        flusherIntroductionPool.Put(i)
 }
 
@@ -100,11 +102,12 @@ func generateMergerIntroduction() *mergerIntroduction {
        if v == nil {
                return &mergerIntroduction{}
        }
-       return v.(*mergerIntroduction)
+       mi := v.(*mergerIntroduction)
+       mi.reset()
+       return mi
 }
 
 func releaseMergerIntroduction(i *mergerIntroduction) {
-       i.reset()
        mergerIntroductionPool.Put(i)
 }
 
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2c2522f1..c4107a80 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -416,6 +416,7 @@ Span is the basic unit of a trace.
 | tags | [Tag](#banyandb-common-v1-Tag) | repeated | tags is a list of tags of 
the span. |
 | message | [string](#string) |  | message is the message generated by the 
span. |
 | children | [Span](#banyandb-common-v1-Span) | repeated | children is a list 
of child spans of the span. |
+| duration | [int64](#int64) |  | duration is the duration of the span. |
 
 
 
diff --git a/pkg/query/doc.go b/pkg/query/doc.go
new file mode 100644
index 00000000..1c2d3bf7
--- /dev/null
+++ b/pkg/query/doc.go
@@ -0,0 +1,19 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package query provides the query common interfaces and utilities.
+package query
diff --git a/pkg/query/logical/index_filter.go 
b/pkg/query/logical/index_filter.go
index ceafa695..8450c94e 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -141,28 +141,44 @@ func parseCondition(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, ex
                return newNot(indexRule, newEq(indexRule, expr)), 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_HAVING:
                bb := expr.Bytes()
-               and := newAnd(len(bb))
+               l := len(bb)
+               if l < 1 {
+                       return ENode, [][]*modelv1.TagValue{entity}, nil
+               }
+               and := newAnd(l)
                for _, b := range bb {
                        and.append(newEq(indexRule, newBytesLiteral(b)))
                }
                return and, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                bb := expr.Bytes()
-               and := newAnd(len(bb))
+               l := len(bb)
+               if l < 1 {
+                       return ENode, [][]*modelv1.TagValue{entity}, nil
+               }
+               and := newAnd(l)
                for _, b := range bb {
                        and.append(newEq(indexRule, newBytesLiteral(b)))
                }
                return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, 
nil
        case modelv1.Condition_BINARY_OP_IN:
                bb := expr.Bytes()
-               or := newOr(len(bb))
+               l := len(bb)
+               if l < 1 {
+                       return ENode, [][]*modelv1.TagValue{entity}, nil
+               }
+               or := newOr(l)
                for _, b := range bb {
                        or.append(newEq(indexRule, newBytesLiteral(b)))
                }
                return or, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
                bb := expr.Bytes()
-               or := newOr(len(bb))
+               l := len(bb)
+               if l < 1 {
+                       return ENode, [][]*modelv1.TagValue{entity}, nil
+               }
+               or := newOr(l)
                for _, b := range bb {
                        or.append(newEq(indexRule, newBytesLiteral(b)))
                }
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 31226984..bd4377a3 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -33,7 +33,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -155,29 +157,46 @@ type distributedPlan struct {
        maxDataPointsSize uint32
 }
 
-func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, 
error) {
+func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, 
err error) {
        dctx := executor.FromDistributedExecutionContext(ctx)
-       query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
-       query.TimeRange = dctx.TimeRange()
+       queryRequest := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
+       queryRequest.TimeRange = dctx.TimeRange()
        if t.maxDataPointsSize > 0 {
-               query.Limit = t.maxDataPointsSize
+               queryRequest.Limit = t.maxDataPointsSize
        }
-       var allErr error
-       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       tracer := query.GetTracer(ctx)
+       var span *query.Span
+       if tracer != nil {
+               span, _ = tracer.StartSpan(ctx, "distributed-client")
+               queryRequest.Trace = true
+               span.Tag("request", 
convert.BytesToString(logger.Proto(queryRequest)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.Stop()
+                       }
+               }()
+       }
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
        if err != nil {
                return nil, err
        }
        var see []sort.Iterator[*comparableDataPoint]
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
-                       allErr = multierr.Append(allErr, getErr)
+                       err = multierr.Append(err, getErr)
                } else {
                        d := m.Data()
                        if d == nil {
                                continue
                        }
+                       resp := d.(*measurev1.QueryResponse)
+                       if span != nil {
+                               span.AddSubTrace(resp.Trace)
+                       }
                        see = append(see,
-                               
newSortableElements(d.(*measurev1.QueryResponse).DataPoints,
+                               newSortableElements(resp.DataPoints,
                                        t.sortByTime, t.sortTagSpec))
                }
        }
@@ -185,7 +204,7 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(executor.MIterator, erro
                Iterator: sort.NewItemIter(see, t.desc),
        }
        smi.init()
-       return smi, allErr
+       return smi, err
 }
 
 func (t *distributedPlan) String() string {
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index c5de0da2..2e8d7304 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -153,6 +154,8 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit 
executor.MIterator, e
                orderByType = pbv1.OrderByTypeSeries
        }
        ec := executor.FromMeasureExecutionContext(ctx)
+       ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderByType, 
orderBy)
+       defer stop(err)
        result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{
                Name:            i.metadata.GetName(),
                TimeRange:       &i.timeRange,
@@ -209,6 +212,9 @@ type resultMIterator struct {
 }
 
 func (ei *resultMIterator) Next() bool {
+       if ei.result == nil {
+               return false
+       }
        ei.i++
        if ei.i < len(ei.current) {
                return true
@@ -256,6 +262,33 @@ func (ei *resultMIterator) Current() 
[]*measurev1.DataPoint {
 }
 
 func (ei *resultMIterator) Close() error {
-       ei.result.Release()
+       if ei.result != nil {
+               ei.result.Release()
+       }
        return nil
 }
+
+func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, 
orderType pbv1.OrderByType, orderBy *pbv1.OrderBy) (context.Context, 
func(error)) {
+       if tracer == nil {
+               return ctx, func(error) {}
+       }
+
+       span, ctx := tracer.StartSpan(ctx, "indexScan-%s", i.metadata)
+       sortName := modelv1.Sort_name[int32(orderBy.Sort)]
+       switch orderType {
+       case pbv1.OrderByTypeTime:
+               span.Tag("orderBy", "time "+sortName)
+       case pbv1.OrderByTypeIndex:
+               span.Tag("orderBy", fmt.Sprintf("indexRule:%s", 
orderBy.Index.Metadata.Name))
+       case pbv1.OrderByTypeSeries:
+               span.Tag("orderBy", "series")
+       }
+       span.Tag("details", i.String())
+
+       return ctx, func(err error) {
+               if err != nil {
+                       span.Error(err)
+               }
+               span.Stop()
+       }
+}
diff --git a/pkg/query/tracer.go b/pkg/query/tracer.go
new file mode 100644
index 00000000..4d761273
--- /dev/null
+++ b/pkg/query/tracer.go
@@ -0,0 +1,155 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+       "context"
+       "fmt"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+)
+
+var (
+       spanKey   = spanContextKey{}
+       tracerKey = tracerContextKey{}
+)
+
+type (
+       spanContextKey   struct{}
+       tracerContextKey struct{}
+)
+
+// Tracer is a simple tracer for query.
+type Tracer struct {
+       data *commonv1.Trace
+}
+
+// NewTracer creates a new tracer.
+func NewTracer(ctx context.Context, id string) (*Tracer, context.Context) {
+       tracer := GetTracer(ctx)
+       if tracer != nil {
+               return tracer, ctx
+       }
+       t := &Tracer{
+               data: &commonv1.Trace{
+                       TraceId: id,
+               },
+       }
+       return t, context.WithValue(ctx, tracerKey, t)
+}
+
+// GetTracer returns the tracer from the context.
+func GetTracer(ctx context.Context) *Tracer {
+       tv := ctx.Value(tracerKey)
+       if tv == nil {
+               return nil
+       }
+       tracer, ok := ctx.Value(tracerKey).(*Tracer)
+       if ok {
+               return tracer
+       }
+       panic(fmt.Errorf("invalid tracer context value: %v", tv))
+}
+
+// StartSpan starts a new span.
+func (t *Tracer) StartSpan(ctx context.Context, format string, args 
...interface{}) (*Span, context.Context) {
+       s := &Span{
+               data: &commonv1.Span{
+                       Message:   fmt.Sprintf(format, args...),
+                       StartTime: timestamppb.Now(),
+               },
+               tracer: t,
+       }
+       sv := ctx.Value(spanKey)
+       if sv == nil {
+               t.data.Spans = append(t.data.Spans, s.data)
+               return s, context.WithValue(ctx, spanKey, s)
+       }
+       parentSpan, ok := ctx.Value(spanKey).(*Span)
+       if ok {
+               parentSpan.addChild(s.data)
+       } else {
+               t.data.Spans = append(t.data.Spans, s.data)
+       }
+       return s, context.WithValue(ctx, spanKey, s)
+}
+
+// ToProto returns the proto representation of the tracer.
+func (t *Tracer) ToProto() *commonv1.Trace {
+       return t.data
+}
+
+// Span is a span of the tracer.
+type Span struct {
+       data   *commonv1.Span
+       tracer *Tracer
+}
+
+func (s *Span) addChild(child *commonv1.Span) {
+       s.data.Children = append(s.data.Children, child)
+       if child.Error {
+               s.Error(fmt.Errorf("sub span error"))
+       }
+}
+
+// AddSubTrace adds a sub trace to the span.
+func (s *Span) AddSubTrace(trace *commonv1.Trace) {
+       if trace == nil {
+               return
+       }
+       for i := range trace.Spans {
+               s.addChild(trace.Spans[i])
+       }
+}
+
+// Tag adds a tag to the span.
+func (s *Span) Tag(key, value string) *Span {
+       s.data.Tags = append(s.data.Tags, &commonv1.Tag{
+               Key:   key,
+               Value: value,
+       })
+       return s
+}
+
+// Tagf adds a formatted tag to the span.
+func (s *Span) Tagf(key, format string, args ...any) *Span {
+       s.data.Tags = append(s.data.Tags, &commonv1.Tag{
+               Key:   key,
+               Value: fmt.Sprintf(format, args...),
+       })
+       return s
+}
+
+// Error marks the span as an error span.
+func (s *Span) Error(err error) *Span {
+       if s.data.Error {
+               return s
+       }
+       s.data.Error = true
+       s.Tag("error_msg", err.Error())
+       s.tracer.data.Error = true
+       return s
+}
+
+// Stop stops the span.
+func (s *Span) Stop() {
+       s.data.EndTime = timestamppb.Now()
+       s.data.Duration = 
s.data.EndTime.AsTime().Sub(s.data.StartTime.AsTime()).Milliseconds()
+}
diff --git a/pkg/query/tracer_test.go b/pkg/query/tracer_test.go
new file mode 100644
index 00000000..cfa4c666
--- /dev/null
+++ b/pkg/query/tracer_test.go
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+)
+
+func TestNewTracer(t *testing.T) {
+       ctx := context.Background()
+       tracer, newCtx := NewTracer(ctx, "test-trace-id")
+       assert.NotNil(t, tracer)
+       assert.NotEqual(t, ctx, newCtx, "context should be different after 
adding a tracer")
+}
+
+func TestGetTracer(t *testing.T) {
+       ctx := context.Background()
+       var tracer *Tracer
+       tracer, ctx = NewTracer(ctx, "test-trace-id")
+
+       retrievedTracer := GetTracer(ctx)
+       assert.Equal(t, tracer, retrievedTracer, "retrieved tracer should be 
the same as the original")
+}
+
+func TestStartSpan(t *testing.T) {
+       ctx := context.Background()
+       var tracer *Tracer
+       tracer, ctx = NewTracer(ctx, "test-trace-id")
+       span, spanCtx := tracer.StartSpan(ctx, "test span %s", "1")
+
+       assert.NotNil(t, span)
+       assert.NotEqual(t, ctx, spanCtx, "context should be different after 
starting a span")
+       assert.Equal(t, "test span 1", span.data.Message)
+       assert.NotNil(t, span.data.StartTime)
+}
+
+func TestSpan_AddChild(t *testing.T) {
+       ctx := context.Background()
+       var tracer *Tracer
+       tracer, ctx = NewTracer(ctx, "test-trace-id")
+       var parentSpan *Span
+       parentSpan, ctx = tracer.StartSpan(ctx, "parent span")
+       childSpan, _ := tracer.StartSpan(ctx, "child span")
+
+       assert.Contains(t, parentSpan.data.Children, childSpan.data, "parent 
span should contain the child span")
+}
+
+func TestSpan_AddSubTrace(t *testing.T) {
+       ctx := context.Background()
+       var tracer *Tracer
+       tracer, ctx = NewTracer(ctx, "test-trace-id")
+       span, _ := tracer.StartSpan(ctx, "span")
+
+       subTrace := &commonv1.Trace{
+               Spans: []*commonv1.Span{
+                       {Message: "sub span 1"},
+                       {Message: "sub span 2"},
+               },
+       }
+
+       span.AddSubTrace(subTrace)
+
+       assert.Equal(t, 2, len(span.data.Children), "span should contain two 
children")
+       assert.Equal(t, "sub span 1", span.data.Children[0].Message)
+       assert.Equal(t, "sub span 2", span.data.Children[1].Message)
+}
+
+func TestSpan_Tag(t *testing.T) {
+       ctx := context.Background()
+       var tracer *Tracer
+       tracer, ctx = NewTracer(ctx, "test-trace-id")
+       span, _ := tracer.StartSpan(ctx, "span")
+
+       span.Tag("key", "value")
+
+       assert.Equal(t, 1, len(span.data.Tags), "span should have one tag")
+       assert.Equal(t, "key", span.data.Tags[0].Key)
+       assert.Equal(t, "value", span.data.Tags[0].Value)
+
+       span.Tagf("key", "value %s", "formatted")
+       assert.Equal(t, 2, len(span.data.Tags), "span should have two tags")
+       assert.Equal(t, "key", span.data.Tags[1].Key)
+       assert.Equal(t, "value formatted", span.data.Tags[1].Value)
+}
+
+func TestSpan_Error(t *testing.T) {
+       ctx := context.Background()
+       var tracer *Tracer
+       tracer, ctx = NewTracer(ctx, "test-trace-id")
+       span, _ := tracer.StartSpan(ctx, "span")
+
+       span.Error(fmt.Errorf("test error"))
+
+       assert.True(t, span.data.Error, "span should be marked as error")
+       assert.True(t, tracer.data.Error, "tracer should be marked as error")
+       assert.Equal(t, "test error", span.data.Tags[0].Value, "error message 
should be added as a tag")
+}
+
+func TestSpan_Stop(t *testing.T) {
+       ctx := context.Background()
+       tracer, _ := NewTracer(ctx, "test-trace-id")
+       span, _ := tracer.StartSpan(ctx, "span")
+       time.Sleep(10 * time.Millisecond)
+
+       span.Stop()
+
+       assert.NotNil(t, span.data.EndTime, "span end time should be set")
+       assert.Greater(t, span.data.Duration, int64(0), "span duration should 
be greater than 0")
+}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index b796ef01..eb9d342b 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -45,6 +45,9 @@ var (
 
 // PreloadSchema loads schemas from files in the booting process.
 func PreloadSchema(ctx context.Context, e schema.Registry) error {
+       if e == nil {
+               return nil
+       }
        g := &commonv1.Group{}
        if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
                return err
diff --git a/test/stress/trace/docker-compose-cluster.yaml 
b/test/stress/trace/docker-compose-cluster.yaml
index 48b49e7e..ddf14221 100644
--- a/test/stress/trace/docker-compose-cluster.yaml
+++ b/test/stress/trace/docker-compose-cluster.yaml
@@ -76,6 +76,10 @@ services:
     build:
       dockerfile: ./docker/Dockerfile
       context: ../../..
+    ports:
+    - 17913:17913
+    - 6060:6060
+    - 2121:2121
     deploy:
       resources:
         limits:
@@ -89,7 +93,7 @@ services:
       file: ../../docker/base-compose.yml
       service: oap
     # TODO: use the main repo image once v0.6.0 is released and merged into 
the main repo
-    image: "ghcr.io/apache/skywalking/data-generator:${SW_OAP_COMMIT}"
+    image: "hanahmily/data-generator:${SW_OAP_COMMIT}"
     environment:
       SW_STORAGE: banyandb
       SW_STORAGE_BANYANDB_TARGETS: "liaison:17912"

Reply via email to