hanahmily commented on code in PR #1177:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1177#discussion_r3411725814


##########
pkg/query/vectorized/trace/limit_carry.go:
##########
@@ -0,0 +1,110 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// LimitedDistinctTraceID applies MaxTraceSize while carrying kept rows.
+type LimitedDistinctTraceID struct {
+       schema *vectorized.BatchSchema
+       carry  *DistinctTraceID
+       max    uint32
+       seen   uint32
+       closed bool
+}
+
+// NewLimitedDistinctTraceID constructs the Phase-1 limit and carry fusible.
+func NewLimitedDistinctTraceID(schema *vectorized.BatchSchema, maxRows uint32) 
*LimitedDistinctTraceID {
+       return &LimitedDistinctTraceID{
+               schema: schema,
+               carry:  NewDistinctTraceID(schema),
+               max:    maxRows,
+       }
+}
+
+// Init resets carry-forward state.
+func (l *LimitedDistinctTraceID) Init(ctx context.Context) error {
+       l.seen = 0
+       return l.carry.Init(ctx)
+}
+
+// OutputSchema returns the unchanged input schema.
+func (l *LimitedDistinctTraceID) OutputSchema() *vectorized.BatchSchema {
+       return l.schema
+}
+
+// Process rewrites selection to kept rows and records only those rows.
+func (l *LimitedDistinctTraceID) Process(_ context.Context, batch 
*vectorized.RecordBatch) error {
+       if l.max == 0 {
+               out := activeIndices(batch)
+               batch.Selection = out
+               keys := phase1Keys(batch).Data()
+               partIDs := phase1PartIDs(batch).Data()
+               traceIDs := phase1TraceIDs(batch).Data()
+               for _, rowIdx := range out {
+                       traceID := traceIDs[rowIdx]
+                       if _, seen := l.carry.keys[traceID]; seen {
+                               continue
+                       }
+                       partID := partIDs[rowIdx]
+                       l.carry.traceIDsByPart[partID] = 
append(l.carry.traceIDsByPart[partID], traceID)
+                       l.carry.keys[traceID] = keys[rowIdx]
+                       l.carry.order = append(l.carry.order, traceID)
+               }
+               return nil
+       }
+       keys := phase1Keys(batch).Data()
+       partIDs := phase1PartIDs(batch).Data()
+       traceIDs := phase1TraceIDs(batch).Data()
+       out := make([]uint16, 0, batch.ActiveLen())
+       for _, rowIdx := range activeIndices(batch) {
+               if l.seen < l.max {
+                       traceID := traceIDs[rowIdx]
+                       partID := partIDs[rowIdx]
+                       l.carry.traceIDsByPart[partID] = 
append(l.carry.traceIDsByPart[partID], traceID)
+                       l.carry.keys[traceID] = keys[rowIdx]
+                       l.carry.order = append(l.carry.order, traceID)
+                       out = append(out, rowIdx)
+               }
+               l.seen++
+               if l.seen >= l.max {
+                       batch.Selection = out
+                       return vectorized.ErrLimitExhausted
+               }
+       }

Review Comment:
   Fixed in 6ee2f792. Added the same `carry.keys` seen-check to the max>0 path; 
`l.seen` now increments only for new distinct traceIDs (not all rows), so both 
`carry.order`/`carry.traceIDsByPart` are duplicate-free and the limit correctly 
counts distinct traces.



##########
banyand/trace/query_vectorized.go:
##########
@@ -0,0 +1,599 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "fmt"
+       "maps"
+       "slices"
+       "sort"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       vtrace 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/trace"
+)
+
+type vectorizedTraceQueryResult struct {
+       ctx              context.Context
+       cancel           context.CancelFunc
+       finishResultSpan func(int, error)
+       recordResult     func(*model.TraceResult)
+       err              error
+       results          []*model.TraceResult
+       segments         []storage.Segment[*tsTable, option]
+       idx              int
+       released         bool
+}
+
+func newVectorizedTraceQueryResult(
+       ctx context.Context,
+       batch *scanBatch,
+       qo queryOptions,
+       segments []storage.Segment[*tsTable, option],
+       cancel context.CancelFunc,
+       finishResultSpan func(int, error),
+       recordResult func(*model.TraceResult),
+) (*vectorizedTraceQueryResult, error) {
+       results, materializeErr := materializeVectorizedTraceResults(ctx, 
batch, qo)
+       releaseVectorizedScanBatch(batch)
+       if materializeErr != nil {
+               return nil, materializeErr
+       }
+       return &vectorizedTraceQueryResult{
+               ctx:              ctx,
+               cancel:           cancel,
+               finishResultSpan: finishResultSpan,
+               recordResult:     recordResult,
+               results:          results,
+               segments:         segments,
+       }, nil
+}
+
+func (r *vectorizedTraceQueryResult) Pull() *model.TraceResult {
+       select {
+       case <-r.ctx.Done():
+               r.err = r.ctx.Err()
+               return &model.TraceResult{Error: r.err}
+       default:
+       }
+       if r.err != nil {
+               return &model.TraceResult{Error: r.err}
+       }
+       if r.idx >= len(r.results) {
+               return nil
+       }
+       result := r.results[r.idx]
+       r.idx++
+       if r.recordResult != nil {
+               r.recordResult(result)
+       }
+       return result
+}
+
+func (r *vectorizedTraceQueryResult) Release() {
+       if r.released {
+               return
+       }
+       r.released = true
+       traceQueryResultTracker.Release(r)
+       if r.cancel != nil {
+               r.cancel()
+       }
+       for i := range r.segments {
+               r.segments[i].DecRef()
+       }
+       r.segments = nil
+       if r.finishResultSpan != nil {
+               r.finishResultSpan(r.idx, r.err)
+               r.finishResultSpan = nil
+       }
+}
+
+func materializeVectorizedTraceResults(ctx context.Context, batch *scanBatch, 
qo queryOptions) ([]*model.TraceResult, error) {
+       if batch == nil {
+               return nil, nil
+       }
+       if batch.err != nil {
+               return nil, batch.err
+       }
+
+       var budgetBytes int64
+       if qo.QueryMemoryMiB > 0 {
+               budgetBytes = int64(qo.QueryMemoryMiB) * 1024 * 1024
+       }
+       loaded, loadErr := loadTraceCursorsSync(ctx, batch.cursors, budgetBytes)
+       batch.cursors = nil
+       if loadErr != nil {
+               return nil, loadErr
+       }
+       if len(loaded) == 0 {
+               return nil, nil
+       }
+
+       var tagCols []string
+       if qo.TagProjection != nil && len(qo.TagProjection.Names) > 0 {
+               tagCols = append([]string(nil), qo.TagProjection.Names...)
+       }
+       schema := vtrace.NewPhase2Schema(tagCols)
+       source := newLoadedCursorSource(loaded, schema, batch.keys, tagCols, 
qo.schemaTagTypes)
+
+       phase2Plan, buildErr := vtrace.BuildPhase2(source, batch.traceIDsOrder)
+       if buildErr != nil {
+               source.closeAll()
+               return nil, fmt.Errorf("build vectorized trace phase2: %w", 
buildErr)
+       }
+       if initErr := phase2Plan.Pipeline.Init(ctx); initErr != nil {
+               phase2Plan.Pipeline.Close() //nolint:errcheck
+               return nil, fmt.Errorf("init vectorized trace phase2: %w", 
initErr)
+       }
+       defer phase2Plan.Pipeline.Close() //nolint:errcheck
+
+       results := make([]*model.TraceResult, 0, len(batch.traceIDsOrder))
+       for {
+               outBatch, nextErr := phase2Plan.Pipeline.Next(ctx)
+               if nextErr != nil {
+                       return nil, fmt.Errorf("pull vectorized trace phase2: 
%w", nextErr)
+               }
+               if outBatch == nil {
+                       break
+               }
+               result := vtrace.BatchToTraceResult(outBatch, schema)
+               if result != nil {
+                       results = append(results, result)
+               }
+       }
+       return results, nil
+}
+
+// loadedCursorSource is a PullOperator that wraps already-loaded 
[]*blockCursor
+// and emits one Phase-2 RecordBatch per cursor.
+type loadedCursorSource struct {
+       schema         *vectorized.BatchSchema
+       keys           map[string]int64
+       schemaTagTypes map[string]pbv1.ValueType
+       cursors        []*blockCursor
+       tagCols        []string
+       idx            int
+       closed         bool
+}
+
+func newLoadedCursorSource(
+       cursors []*blockCursor,
+       schema *vectorized.BatchSchema,
+       keys map[string]int64,
+       tagCols []string,
+       schemaTagTypes map[string]pbv1.ValueType,
+) *loadedCursorSource {
+       return &loadedCursorSource{
+               cursors:        cursors,
+               schema:         schema,
+               keys:           keys,
+               tagCols:        tagCols,
+               schemaTagTypes: schemaTagTypes,
+       }
+}
+
+// closeAll releases any remaining cursors without marking the source closed.
+func (s *loadedCursorSource) closeAll() {
+       for remainIdx := s.idx; remainIdx < len(s.cursors); remainIdx++ {
+               releaseBlockCursor(s.cursors[remainIdx])
+       }
+       s.idx = len(s.cursors)
+}
+
+func (s *loadedCursorSource) Init(context.Context) error { return nil }
+
+func (s *loadedCursorSource) OutputSchema() *vectorized.BatchSchema { return 
s.schema }
+
+// Close releases all remaining unread cursors. Idempotent.
+func (s *loadedCursorSource) Close() error {
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       s.closeAll()
+       return nil
+}
+
+// NextBatch emits one Phase-2 RecordBatch per loaded blockCursor.
+// Each span in the cursor becomes one row in the batch.
+func (s *loadedCursorSource) NextBatch(ctx context.Context) 
(*vectorized.RecordBatch, error) {
+       for s.idx < len(s.cursors) {
+               if ctxErr := ctx.Err(); ctxErr != nil {
+                       s.closeAll()
+                       return nil, ctxErr
+               }
+               bc := s.cursors[s.idx]
+               s.idx++
+               if len(bc.spans) == 0 {
+                       releaseBlockCursor(bc)
+                       continue
+               }
+
+               batch := vectorized.NewRecordBatch(s.schema, len(bc.spans))
+               tid := bc.bm.traceID
+               key := s.keys[tid]
+
+               tidCol := vtrace.Phase2TraceIDs(batch)
+               keyCol := vtrace.Phase2Keys(batch)
+               spanCol := vtrace.Phase2Spans(batch)
+               spanIDCol := vtrace.Phase2SpanIDs(batch)
+               // Tag-column handles and the cursor's matching tag are 
loop-invariant
+               // across spans; resolve them once per cursor.
+               tagCols := make([]*vectorized.TypedColumn[*modelv1.TagValue], 
len(s.tagCols))
+               cursorTags := make([]*tag, len(s.tagCols))
+               for tagIdx, tagName := range s.tagCols {
+                       tagCols[tagIdx] = vtrace.Phase2TagCol(batch, tagIdx)
+                       cursorTags[tagIdx] = findCursorTag(bc, tagName, 
s.schemaTagTypes)
+               }
+
+               for spanIdx, span := range bc.spans {
+                       tidCol.Append(tid)
+                       keyCol.Append(key)
+                       spanCol.Append(span)
+                       spanID := ""
+                       if spanIdx < len(bc.spanIDs) {
+                               spanID = bc.spanIDs[spanIdx]
+                       }
+                       spanIDCol.Append(spanID)
+
+                       for tagIdx := range s.tagCols {
+                               tv := pbv1.NullTagValue
+                               if cursorTag := cursorTags[tagIdx]; cursorTag 
!= nil && spanIdx < len(cursorTag.values) {
+                                       tv = 
mustDecodeTagValue(cursorTag.valueType, cursorTag.values[spanIdx])
+                               }
+                               tagCols[tagIdx].Append(tv)
+                       }
+               }
+               batch.Len = len(bc.spans)
+               releaseBlockCursor(bc)
+               return batch, nil
+       }
+       return nil, nil
+}
+
+// findCursorTag returns the cursor's tag whose decoded name and value type 
match
+// the schema expectation, or nil when the cursor has no such tag. A name match
+// with a mismatched type keeps scanning so a correctly-typed variant still 
wins.
+func findCursorTag(bc *blockCursor, tagName string, schemaTagTypes 
map[string]pbv1.ValueType) *tag {
+       schemaType, hasSchemaType := schemaTagTypes[tagName]
+       if !hasSchemaType {
+               return nil
+       }
+       for tagIdx := range bc.tags {
+               cursorTag := &bc.tags[tagIdx]
+               if decodeTypedTag(cursorTag.name) != tagName || 
cursorTag.valueType != schemaType {
+                       continue
+               }
+               return cursorTag
+       }
+       return nil
+}
+
+// loadTraceCursorsSync loads span data for each cursor from disk.
+// budgetBytes is a soft span-loading threshold: it caps the cumulative 
uncompressed
+// span bytes loaded across cursors. SIDX responses, tags, record-batch 
overhead, and
+// other per-query allocations are not counted. Pass 0 to disable the cap.
+// Two complementary gates enforce the threshold:
+//  1. Hard stop: once usedBytes >= budgetBytes, remaining cursors are released
+//     without calling loadData.
+//  2. Metadata preflight: cursor.bm.uncompressedSpanSizeBytes (written at 
flush,
+//     available without loading) is used to predict whether a cursor would 
push
+//     usedBytes over budgetBytes. If so the cursor is skipped before loadData.
+//
+// First-block exception: the first cursor always loads regardless of the 
budget
+// so that a query never returns zero results due to a too-small budget. This 
means
+// a single oversized block can exceed budgetBytes; the threshold is 
best-effort,
+// not a hard memory cap.
+func loadTraceCursorsSync(ctx context.Context, cursors []*blockCursor, 
budgetBytes int64) ([]*blockCursor, error) {
+       if len(cursors) == 0 {
+               return nil, nil
+       }
+       var usedBytes int64
+       filtered := cursors[:0]
+       for curIdx, cursor := range cursors {
+               select {
+               case <-ctx.Done():
+                       // filtered aliases cursors[:0]: its entries are the 
loaded cursors,
+                       // cursors[curIdx:] are untouched; everything in 
between was already released.
+                       for _, loadedCursor := range filtered {
+                               releaseBlockCursor(loadedCursor)
+                       }
+                       for _, pendingCursor := range cursors[curIdx:] {
+                               releaseBlockCursor(pendingCursor)
+                       }
+                       return nil, fmt.Errorf("interrupt while loading trace 
data: %w", ctx.Err())
+               default:
+               }
+               if budgetBytes > 0 {
+                       // Hard stop: prior cursors already filled the budget.
+                       if usedBytes >= budgetBytes {
+                               releaseBlockCursor(cursor)
+                               for _, pendingCursor := range 
cursors[curIdx+1:] {
+                                       releaseBlockCursor(pendingCursor)
+                               }
+                               return filtered, nil
+                       }
+                       // Metadata preflight: skip this cursor without calling 
loadData when its
+                       // uncompressed-size estimate would push usedBytes over 
the budget.
+                       // The guard usedBytes > 0 ensures the first cursor 
always loads.
+                       if usedBytes > 0 && 
usedBytes+int64(cursor.bm.uncompressedSpanSizeBytes) > budgetBytes {
+                               releaseBlockCursor(cursor)
+                               for _, pendingCursor := range 
cursors[curIdx+1:] {
+                                       releaseBlockCursor(pendingCursor)
+                               }
+                               return filtered, nil
+                       }
+               }
+               tmpBlock := generateBlock()
+               loaded := cursor.loadData(tmpBlock)
+               releaseBlock(tmpBlock)
+               if !loaded {
+                       releaseBlockCursor(cursor)
+                       continue
+               }
+               if budgetBytes > 0 {
+                       for _, span := range cursor.spans {
+                               usedBytes += int64(len(span))
+                       }
+               }
+               filtered = append(filtered, cursor)
+       }
+       return filtered, nil
+}
+
+func releaseVectorizedScanBatch(batch *scanBatch) {
+       if batch == nil {
+               return
+       }
+       for _, cursor := range batch.cursors {
+               if cursor != nil {
+                       releaseBlockCursor(cursor)
+               }
+       }
+       batch.cursors = nil
+       for _, snapshot := range batch.snapshots {
+               snapshot.decRef()
+       }
+       batch.snapshots = nil
+}
+
+func sidxInstancesToVectorizedIterators(
+       ctx context.Context,
+       instances []sidx.SIDX,
+       req sidx.QueryRequest,
+) ([]itersort.Iterator[*vtrace.MergeItem], error) {
+       iters := make([]itersort.Iterator[*vtrace.MergeItem], 0, len(instances))
+       for instanceIdx, instance := range instances {
+               responses, queryErr := instance.QuerySync(ctx, req)
+               if queryErr != nil {
+                       return nil, fmt.Errorf("query sidx instance %d 
synchronously: %w", instanceIdx, queryErr)
+               }
+               batches, convertErr := 
sidxResponsesToVectorizedBatches(responses)
+               if convertErr != nil {
+                       return nil, fmt.Errorf("convert sidx instance %d 
response: %w", instanceIdx, convertErr)
+               }
+               iters = append(iters, vtrace.NewSidxResponseIterator(batches))
+       }
+       return iters, nil
+}
+
+func (t *trace) buildVectorizedPhase1TraceBatch(
+       ctx context.Context,
+       qo queryOptions,
+       sidxInstances []sidx.SIDX,
+       req sidx.QueryRequest,
+       useSIDX bool,
+       maxTraceSize int,
+) (traceBatch, error) {
+       batchSize := t.vectorized.BatchSize
+       maxRows := uint32(0)
+       if maxTraceSize > 0 {
+               maxRows = uint32(maxTraceSize)
+       }
+       switch {
+       case len(qo.traceIDs) > 0:
+               // Mirror the push path: truncate the raw list to maxTraceSize 
first (preserving
+               // duplicates), then let Phase-1's DistinctTraceID deduplicate 
within that window.
+               ids := qo.traceIDs
+               if maxRows > 0 && int(maxRows) < len(ids) {
+                       ids = ids[:maxRows]
+               }
+               plan, buildErr := vtrace.BuildStaticPhase1(ids, nil, 0, 
batchSize)
+               if buildErr != nil {
+                       return traceBatch{}, fmt.Errorf("build static 
vectorized trace phase1: %w", buildErr)
+               }
+               return drainVectorizedPhase1(ctx, plan, 0)
+       case useSIDX:
+               iters, iterErr := sidxInstancesToVectorizedIterators(ctx, 
sidxInstances, req)
+               if iterErr != nil {
+                       return traceBatch{}, iterErr
+               }
+               // Ordered SIDX path: MaxTraceSize controls SIDX batch size 
(MaxBatchSize in the
+               // request), not the total result cap. The push path emits all 
matching traces in
+               // batches; vectorized must do the same. Pass 0 to disable the 
Phase-1 trace cap.
+               plan, buildErr := vtrace.BuildMergePhase1(iters, 
sidxRequestDesc(req), 0, batchSize)
+               if buildErr != nil {
+                       return traceBatch{}, fmt.Errorf("build ordered 
vectorized trace phase1: %w", buildErr)
+               }
+               return drainVectorizedPhase1(ctx, plan, 0)
+       default:
+               return traceBatch{}, fmt.Errorf("invalid query options: either 
traceIDs or order must be specified")
+       }
+}
+
+func (t *trace) buildVectorizedScanBatch(ctx context.Context, tables 
[]*tsTable, qo queryOptions, batch traceBatch) (*scanBatch, error) {
+       if batch.err != nil {
+               return &scanBatch{traceBatch: batch, err: batch.err}, nil
+       }
+       snapshots := make([]*snapshot, 0, len(tables))
+       for _, table := range tables {
+               s := table.currentSnapshot()
+               if s == nil {
+                       continue
+               }
+               snapshots = append(snapshots, s)
+       }
+       if len(snapshots) == 0 {
+               return &scanBatch{traceBatch: batch}, nil
+       }
+
+       partSelectionCtx, finishPartSelection := startPartSelectionSpan(ctx, 
&batch, snapshots)
+       parts, groupedIDs, metrics := selectVectorizedTraceParts(batch, 
snapshots)
+       if finishPartSelection != nil {
+               finishPartSelection(metrics, len(parts))
+       }
+       cursors, scanErr := t.scanPartsInlineSync(partSelectionCtx, parts, 
groupedIDs, qo)
+       if scanErr != nil {
+               for _, s := range snapshots {
+                       s.decRef()
+               }
+               return nil, scanErr
+       }
+       return &scanBatch{
+               traceBatch: batch,
+               cursors:    cursors,
+               snapshots:  snapshots,
+       }, nil
+}
+
+func selectVectorizedTraceParts(batch traceBatch, snapshots []*snapshot) 
([]*part, [][]string, *partSelectionMetrics) {
+       parts := make([]*part, 0)
+       groupedIDs := make([][]string, 0)
+       allTraceIDs := make([]string, 0)
+       for _, ids := range batch.traceIDs {
+               allTraceIDs = append(allTraceIDs, ids...)
+       }
+       sort.Strings(allTraceIDs)
+
+       bloomFilteredPartIDs := make([]uint64, 0)
+       totalGroupedIDs := 0
+       for _, s := range snapshots {
+               for _, pw := range s.parts {
+                       p := pw.p
+                       partID := p.partMetadata.ID
+
+                       var idsFromSIDX []string
+                       if traceIDsFromSIDX, exists := batch.traceIDs[partID]; 
exists {
+                               idsFromSIDX = append([]string(nil), 
traceIDsFromSIDX...)
+                       }
+                       var idsForPart []string
+                       for _, traceID := range allTraceIDs {
+                               if slices.Contains(idsFromSIDX, traceID) || 
p.traceIDFilter.filter.MightContain(convert.StringToBytes(traceID)) {
+                                       idsForPart = append(idsForPart, traceID)
+                               }
+                       }

Review Comment:
   Fixed in 6ee2f792. Replaced the `[]string` copy + `slices.Contains` with a 
`map[string]struct{}` built once per part before the inner loop, reducing the 
membership test from O(m) to O(1).



##########
banyand/trace/query_vectorized.go:
##########
@@ -0,0 +1,599 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "fmt"
+       "maps"
+       "slices"
+       "sort"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       vtrace 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/trace"
+)
+
+type vectorizedTraceQueryResult struct {
+       ctx              context.Context
+       cancel           context.CancelFunc
+       finishResultSpan func(int, error)
+       recordResult     func(*model.TraceResult)
+       err              error
+       results          []*model.TraceResult
+       segments         []storage.Segment[*tsTable, option]
+       idx              int
+       released         bool
+}
+
+func newVectorizedTraceQueryResult(
+       ctx context.Context,
+       batch *scanBatch,
+       qo queryOptions,
+       segments []storage.Segment[*tsTable, option],
+       cancel context.CancelFunc,
+       finishResultSpan func(int, error),
+       recordResult func(*model.TraceResult),
+) (*vectorizedTraceQueryResult, error) {
+       results, materializeErr := materializeVectorizedTraceResults(ctx, 
batch, qo)
+       releaseVectorizedScanBatch(batch)
+       if materializeErr != nil {
+               return nil, materializeErr
+       }
+       return &vectorizedTraceQueryResult{
+               ctx:              ctx,
+               cancel:           cancel,
+               finishResultSpan: finishResultSpan,
+               recordResult:     recordResult,
+               results:          results,
+               segments:         segments,
+       }, nil
+}
+
+func (r *vectorizedTraceQueryResult) Pull() *model.TraceResult {
+       select {
+       case <-r.ctx.Done():
+               r.err = r.ctx.Err()
+               return &model.TraceResult{Error: r.err}
+       default:
+       }
+       if r.err != nil {
+               return &model.TraceResult{Error: r.err}
+       }
+       if r.idx >= len(r.results) {
+               return nil
+       }
+       result := r.results[r.idx]
+       r.idx++
+       if r.recordResult != nil {
+               r.recordResult(result)
+       }
+       return result
+}
+
+func (r *vectorizedTraceQueryResult) Release() {
+       if r.released {
+               return
+       }
+       r.released = true
+       traceQueryResultTracker.Release(r)
+       if r.cancel != nil {
+               r.cancel()
+       }
+       for i := range r.segments {
+               r.segments[i].DecRef()
+       }
+       r.segments = nil
+       if r.finishResultSpan != nil {
+               r.finishResultSpan(r.idx, r.err)
+               r.finishResultSpan = nil
+       }
+}
+
+func materializeVectorizedTraceResults(ctx context.Context, batch *scanBatch, 
qo queryOptions) ([]*model.TraceResult, error) {
+       if batch == nil {
+               return nil, nil
+       }
+       if batch.err != nil {
+               return nil, batch.err
+       }
+
+       var budgetBytes int64
+       if qo.QueryMemoryMiB > 0 {
+               budgetBytes = int64(qo.QueryMemoryMiB) * 1024 * 1024
+       }
+       loaded, loadErr := loadTraceCursorsSync(ctx, batch.cursors, budgetBytes)
+       batch.cursors = nil
+       if loadErr != nil {
+               return nil, loadErr
+       }
+       if len(loaded) == 0 {
+               return nil, nil
+       }
+
+       var tagCols []string
+       if qo.TagProjection != nil && len(qo.TagProjection.Names) > 0 {
+               tagCols = append([]string(nil), qo.TagProjection.Names...)
+       }
+       schema := vtrace.NewPhase2Schema(tagCols)
+       source := newLoadedCursorSource(loaded, schema, batch.keys, tagCols, 
qo.schemaTagTypes)
+
+       phase2Plan, buildErr := vtrace.BuildPhase2(source, batch.traceIDsOrder)
+       if buildErr != nil {
+               source.closeAll()
+               return nil, fmt.Errorf("build vectorized trace phase2: %w", 
buildErr)
+       }
+       if initErr := phase2Plan.Pipeline.Init(ctx); initErr != nil {
+               phase2Plan.Pipeline.Close() //nolint:errcheck
+               return nil, fmt.Errorf("init vectorized trace phase2: %w", 
initErr)
+       }
+       defer phase2Plan.Pipeline.Close() //nolint:errcheck
+
+       results := make([]*model.TraceResult, 0, len(batch.traceIDsOrder))
+       for {
+               outBatch, nextErr := phase2Plan.Pipeline.Next(ctx)
+               if nextErr != nil {
+                       return nil, fmt.Errorf("pull vectorized trace phase2: 
%w", nextErr)
+               }
+               if outBatch == nil {
+                       break
+               }
+               result := vtrace.BatchToTraceResult(outBatch, schema)
+               if result != nil {
+                       results = append(results, result)
+               }
+       }
+       return results, nil
+}
+
+// loadedCursorSource is a PullOperator that wraps already-loaded 
[]*blockCursor
+// and emits one Phase-2 RecordBatch per cursor.
+type loadedCursorSource struct {
+       schema         *vectorized.BatchSchema
+       keys           map[string]int64
+       schemaTagTypes map[string]pbv1.ValueType
+       cursors        []*blockCursor
+       tagCols        []string
+       idx            int
+       closed         bool
+}
+
+func newLoadedCursorSource(
+       cursors []*blockCursor,
+       schema *vectorized.BatchSchema,
+       keys map[string]int64,
+       tagCols []string,
+       schemaTagTypes map[string]pbv1.ValueType,
+) *loadedCursorSource {
+       return &loadedCursorSource{
+               cursors:        cursors,
+               schema:         schema,
+               keys:           keys,
+               tagCols:        tagCols,
+               schemaTagTypes: schemaTagTypes,
+       }
+}
+
+// closeAll releases any remaining cursors without marking the source closed.
+func (s *loadedCursorSource) closeAll() {
+       for remainIdx := s.idx; remainIdx < len(s.cursors); remainIdx++ {
+               releaseBlockCursor(s.cursors[remainIdx])
+       }
+       s.idx = len(s.cursors)
+}
+
+func (s *loadedCursorSource) Init(context.Context) error { return nil }
+
+func (s *loadedCursorSource) OutputSchema() *vectorized.BatchSchema { return 
s.schema }
+
+// Close releases all remaining unread cursors. Idempotent.
+func (s *loadedCursorSource) Close() error {
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       s.closeAll()
+       return nil
+}
+
+// NextBatch emits one Phase-2 RecordBatch per loaded blockCursor.
+// Each span in the cursor becomes one row in the batch.
+func (s *loadedCursorSource) NextBatch(ctx context.Context) 
(*vectorized.RecordBatch, error) {
+       for s.idx < len(s.cursors) {
+               if ctxErr := ctx.Err(); ctxErr != nil {
+                       s.closeAll()
+                       return nil, ctxErr
+               }
+               bc := s.cursors[s.idx]
+               s.idx++
+               if len(bc.spans) == 0 {
+                       releaseBlockCursor(bc)
+                       continue
+               }
+
+               batch := vectorized.NewRecordBatch(s.schema, len(bc.spans))
+               tid := bc.bm.traceID
+               key := s.keys[tid]
+
+               tidCol := vtrace.Phase2TraceIDs(batch)
+               keyCol := vtrace.Phase2Keys(batch)
+               spanCol := vtrace.Phase2Spans(batch)
+               spanIDCol := vtrace.Phase2SpanIDs(batch)
+               // Tag-column handles and the cursor's matching tag are 
loop-invariant
+               // across spans; resolve them once per cursor.
+               tagCols := make([]*vectorized.TypedColumn[*modelv1.TagValue], 
len(s.tagCols))
+               cursorTags := make([]*tag, len(s.tagCols))
+               for tagIdx, tagName := range s.tagCols {
+                       tagCols[tagIdx] = vtrace.Phase2TagCol(batch, tagIdx)
+                       cursorTags[tagIdx] = findCursorTag(bc, tagName, 
s.schemaTagTypes)
+               }
+
+               for spanIdx, span := range bc.spans {
+                       tidCol.Append(tid)
+                       keyCol.Append(key)
+                       spanCol.Append(span)
+                       spanID := ""
+                       if spanIdx < len(bc.spanIDs) {
+                               spanID = bc.spanIDs[spanIdx]
+                       }
+                       spanIDCol.Append(spanID)
+
+                       for tagIdx := range s.tagCols {
+                               tv := pbv1.NullTagValue
+                               if cursorTag := cursorTags[tagIdx]; cursorTag 
!= nil && spanIdx < len(cursorTag.values) {
+                                       tv = 
mustDecodeTagValue(cursorTag.valueType, cursorTag.values[spanIdx])
+                               }
+                               tagCols[tagIdx].Append(tv)
+                       }
+               }
+               batch.Len = len(bc.spans)
+               releaseBlockCursor(bc)
+               return batch, nil
+       }
+       return nil, nil
+}
+
+// findCursorTag returns the cursor's tag whose decoded name and value type 
match
+// the schema expectation, or nil when the cursor has no such tag. A name match
+// with a mismatched type keeps scanning so a correctly-typed variant still 
wins.
+func findCursorTag(bc *blockCursor, tagName string, schemaTagTypes 
map[string]pbv1.ValueType) *tag {
+       schemaType, hasSchemaType := schemaTagTypes[tagName]
+       if !hasSchemaType {
+               return nil
+       }
+       for tagIdx := range bc.tags {
+               cursorTag := &bc.tags[tagIdx]
+               if decodeTypedTag(cursorTag.name) != tagName || 
cursorTag.valueType != schemaType {
+                       continue
+               }
+               return cursorTag
+       }
+       return nil
+}
+
+// loadTraceCursorsSync loads span data for each cursor from disk.
+// budgetBytes is a soft span-loading threshold: it caps the cumulative 
uncompressed
+// span bytes loaded across cursors. SIDX responses, tags, record-batch 
overhead, and
+// other per-query allocations are not counted. Pass 0 to disable the cap.
+// Two complementary gates enforce the threshold:

Review Comment:
   Fixed in 6ee2f792. Updated the comment to read "Pass 0 (or any non-positive 
value) to disable the cap" to match the `budgetBytes > 0` gate in the 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to