This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 27c656d8 Implement traceID-based query (#735)
27c656d8 is described below

commit 27c656d8ebb95fd5b0823110c42507efb52a4261
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Aug 26 22:19:09 2025 +0800

    Implement traceID-based query (#735)
    
    * Implement traceID-based query
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                    |   4 +
 banyand/trace/block.go        |  10 +-
 banyand/trace/block_writer.go |   1 -
 banyand/trace/merger.go       |  30 ++++-
 banyand/trace/part_iter.go    |  68 +++--------
 banyand/trace/query.go        | 277 ++++++++++++++++++++++++++++++++++++++++--
 banyand/trace/query_test.go   | 202 ++++++++++++++++++++++++++++++
 banyand/trace/snapshot.go     |   3 +-
 banyand/trace/tracing.go      |  29 ++++-
 banyand/trace/tstable.go      |  96 +++------------
 banyand/trace/tstable_test.go |  34 +++---
 pkg/query/model/model.go      |  10 +-
 12 files changed, 588 insertions(+), 176 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ae00c13d..446f62c6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -33,6 +33,10 @@ Release Notes.
 - Enhance flusher and introducer loops to support merging operations, 
improving efficiency by eliminating the need for a separate merge loop and 
optimizing data handling process during flushing and merging
 - Enhance stream synchronization with configurable sync interval - Allows 
customization of synchronization timing for better performance tuning
 - Refactor flusher and introducer loops to support conditional merging - 
Optimizes data processing by adding conditional logic to merge operations
+- New storage engine for trace:
+  - Data ingestion and retrieval.
+  - Flush memory data to disk.
+  - Merge memory data and disk data.
 
 ### Bug Fixes
 
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 404bb158..1fc28c26 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -288,6 +288,7 @@ func mustReadSpansFrom(spans [][]byte, sm *dataBlock, count 
int, reader fs.Reade
                if uint64(len(src)) < spanLen {
                        logger.Panicf("insufficient data for span: need %d 
bytes, have %d", spanLen, len(src))
                }
+               spans[i] = spans[i][:0]
                spans[i] = append(spans[i], src[:spanLen]...)
                src = src[spanLen:]
        }
@@ -311,6 +312,7 @@ func mustSeqReadSpansFrom(spans [][]byte, sm *dataBlock, 
count int, reader *seqR
                if uint64(len(src)) < spanLen {
                        logger.Panicf("insufficient data for span: need %d 
bytes, have %d", spanLen, len(src))
                }
+               spans[i] = spans[i][:0]
                spans[i] = append(spans[i], src[:spanLen]...)
                src = src[spanLen:]
        }
@@ -381,11 +383,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult, 
desc bool) {
                return
        }
 
-       requiredCapacity := end - start
-       r.TIDs = append(r.TIDs, make([]string, requiredCapacity)...)
-       for i := range r.TIDs[len(r.TIDs)-requiredCapacity:] {
-               r.TIDs[len(r.TIDs)-requiredCapacity+i] = bc.bm.traceID
-       }
+       r.TID = bc.bm.traceID
        r.Spans = append(r.Spans, bc.spans[start:end]...)
 
        if desc {
@@ -417,7 +415,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult, desc 
bool) {
 
 func (bc *blockCursor) copyTo(r *model.TraceResult) {
        r.Spans = append(r.Spans, bc.spans[bc.idx])
-       r.TIDs = append(r.TIDs, bc.bm.traceID)
+       r.TID = bc.bm.traceID
        if len(r.Tags) != len(bc.tagProjection.Names) {
                for _, name := range bc.tagProjection.Names {
                        r.Tags = append(r.Tags, model.Tag{Name: name})
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 9927dbe6..f7c46c77 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -270,7 +270,6 @@ func (bw *blockWriter) Flush(pm *partMetadata, tf 
*traceIDFilter, tt *tagType) {
        pm.UncompressedSpanSizeBytes = bw.totalUncompressedSpanSizeBytes
        pm.TotalCount = bw.totalCount
        pm.BlocksCount = bw.totalBlocksCount
-       // TODO: update timestamp metadata when merging blocks
        pm.MinTimestamp = bw.totalMinTimestamp
        pm.MaxTimestamp = bw.totalMaxTimestamp
 
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 89a8fb98..d9dcb3d2 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -260,7 +260,25 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
                }
        }
 
-       pm, err := mergeBlocks(closeCh, bw, br)
+       var minTimestamp, maxTimestamp int64
+       for i, pw := range parts {
+               pm := pw.p.partMetadata
+               if i == 0 {
+                       minTimestamp = pm.MinTimestamp
+                       maxTimestamp = pm.MaxTimestamp
+                       continue
+               }
+               if pm.MinTimestamp < minTimestamp {
+                       minTimestamp = pm.MinTimestamp
+               }
+               if pm.MaxTimestamp > maxTimestamp {
+                       maxTimestamp = pm.MaxTimestamp
+               }
+       }
+
+       pm, tf, tt, err := mergeBlocks(closeCh, bw, br)
+       pm.MinTimestamp = minTimestamp
+       pm.MaxTimestamp = maxTimestamp
        releaseBlockWriter(bw)
        releaseBlockReader(br)
        for i := range pii {
@@ -270,6 +288,8 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
                return nil, err
        }
        pm.mustWriteMetadata(fileSystem, dstPath)
+       tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+       tt.mustWriteTagType(fileSystem, dstPath)
        fileSystem.SyncPath(dstPath)
        p := mustOpenFilePart(partID, root, fileSystem)
 
@@ -278,7 +298,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
 
 var errClosed = fmt.Errorf("the merger is closed")
 
-func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) 
(*partMetadata, error) {
+func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) 
(*partMetadata, *traceIDFilter, *tagType, error) {
        pendingBlockIsEmpty := true
        pendingBlock := generateBlockPointer()
        defer releaseBlockPointer(pendingBlock)
@@ -299,7 +319,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
        for br.nextBlockMetadata() {
                select {
                case <-closeCh:
-                       return nil, errClosed
+                       return nil, nil, nil, errClosed
                default:
                }
                b := br.block
@@ -342,7 +362,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                pendingBlockIsEmpty = true
        }
        if err := br.error(); err != nil {
-               return nil, fmt.Errorf("cannot read block to merge: %w", err)
+               return nil, nil, nil, fmt.Errorf("cannot read block to merge: 
%w", err)
        }
        if !pendingBlockIsEmpty {
                bw.mustWriteBlock(pendingBlock.bm.traceID, &pendingBlock.block)
@@ -352,7 +372,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
        var tf traceIDFilter
        tt := make(tagType)
        bw.Flush(&pm, &tf, &tt)
-       return &pm, nil
+       return &pm, &tf, &tt, nil
 }
 
 func mergeTwoBlocks(target, left, right *blockPointer) {
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index fc36f291..9982b818 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -36,21 +36,17 @@ type partIter struct {
        err                  error
        p                    *part
        curBlock             *blockMetadata
-       tids                 []string
+       tid                  string
        primaryBlockMetadata []primaryBlockMetadata
        bms                  []blockMetadata
        compressedPrimaryBuf []byte
        primaryBuf           []byte
-       tidIdx               int
-       minTimestamp         int64
-       maxTimestamp         int64
 }
 
 func (pi *partIter) reset() {
        pi.curBlock = nil
        pi.p = nil
-       pi.tids = nil
-       pi.tidIdx = 0
+       pi.tid = ""
        pi.primaryBlockMetadata = nil
        pi.bms = nil
        pi.compressedPrimaryBuf = pi.compressedPrimaryBuf[:0]
@@ -58,19 +54,17 @@ func (pi *partIter) reset() {
        pi.err = nil
 }
 
-func (pi *partIter) init(bma *blockMetadataArray, p *part, tids []string, 
minTimestamp, maxTimestamp int64) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, tid string) {
        pi.reset()
        pi.curBlock = &blockMetadata{}
        pi.p = p
 
        pi.bms = bma.arr
-       pi.tids = tids
-       pi.minTimestamp = minTimestamp
-       pi.maxTimestamp = maxTimestamp
+       pi.tid = tid
 
        pi.primaryBlockMetadata = p.primaryBlockMetadata
 
-       pi.nextTraceID()
+       pi.curBlock.traceID = tid
 }
 
 func (pi *partIter) nextBlock() bool {
@@ -96,47 +90,14 @@ func (pi *partIter) error() error {
        return pi.err
 }
 
-func (pi *partIter) nextTraceID() bool {
-       if pi.tidIdx >= len(pi.tids) {
-               pi.err = io.EOF
-               return false
-       }
-       pi.curBlock.traceID = pi.tids[pi.tidIdx]
-       pi.tidIdx++
-       return true
-}
-
-func (pi *partIter) searchTargetTraceID(tid string) bool {
-       if pi.curBlock.traceID >= tid {
-               return true
-       }
-       if !pi.nextTraceID() {
-               return false
-       }
-       if pi.curBlock.traceID >= tid {
-               return true
-       }
-       tids := pi.tids[pi.tidIdx:]
-       pi.tidIdx += sort.Search(len(tids), func(i int) bool {
-               return tid <= tids[i]
-       })
-       if pi.tidIdx >= len(pi.tids) {
-               pi.tidIdx = len(pi.tids)
-               pi.err = io.EOF
-               return false
-       }
-       pi.curBlock.traceID = pi.tids[pi.tidIdx]
-       pi.tidIdx++
-       return true
-}
-
 func (pi *partIter) loadNextBlockMetadata() bool {
        if len(pi.primaryBlockMetadata) > 0 {
-               if !pi.searchTargetTraceID(pi.primaryBlockMetadata[0].traceID) {
+               if pi.curBlock.traceID < pi.primaryBlockMetadata[0].traceID {
+                       pi.err = io.EOF
                        return false
                }
-               pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata, 
pi.curBlock.traceID)
 
+               pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata, 
pi.curBlock.traceID)
                pbm := &pi.primaryBlockMetadata[0]
                pi.primaryBlockMetadata = pi.primaryBlockMetadata[1:]
                if pi.curBlock.traceID < pbm.traceID {
@@ -193,24 +154,23 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, 
mr *primaryBlockMetada
 
 func (pi *partIter) findBlock() bool {
        bhs := pi.bms
-       for len(bhs) > 0 {
+       if len(bhs) > 0 {
                tid := pi.curBlock.traceID
                if bhs[0].traceID < tid {
                        n := sort.Search(len(bhs), func(i int) bool {
                                return tid <= bhs[i].traceID
                        })
                        if n == len(bhs) {
-                               break
+                               pi.bms = nil
+                               return false
                        }
                        bhs = bhs[n:]
                }
                bm := &bhs[0]
 
-               if bm.traceID != tid {
-                       if !pi.searchTargetTraceID(bm.traceID) {
-                               return false
-                       }
-                       continue
+               if bm.traceID > tid {
+                       pi.bms = bhs[:0]
+                       return false
                }
 
                pi.curBlock = bm
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 503fb2ec..df81f15c 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -18,39 +18,298 @@
 package trace
 
 import (
-       "github.com/apache/skywalking-banyandb/api/common"
+       "container/heap"
+       "context"
+       "fmt"
+       "sort"
+
+       "github.com/pkg/errors"
+
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
+const checkDoneEvery = 128
+
+var nilResult = model.TraceQueryResult(nil)
+
 type queryOptions struct {
+       traceID string
        model.TraceQueryOptions
-       seriesToEntity map[common.SeriesID][]*modelv1.TagValue
-       sortedSids     []common.SeriesID
-       minTimestamp   int64
-       maxTimestamp   int64
+       minTimestamp int64
+       maxTimestamp int64
 }
 
 func (qo *queryOptions) reset() {
        qo.TraceQueryOptions.Reset()
-       qo.seriesToEntity = nil
-       qo.sortedSids = nil
+       qo.traceID = ""
        qo.minTimestamp = 0
        qo.maxTimestamp = 0
 }
 
 func (qo *queryOptions) copyFrom(other *queryOptions) {
        qo.TraceQueryOptions.CopyFrom(&other.TraceQueryOptions)
-       qo.seriesToEntity = other.seriesToEntity
-       qo.sortedSids = other.sortedSids
+       qo.traceID = other.traceID
        qo.minTimestamp = other.minTimestamp
        qo.maxTimestamp = other.maxTimestamp
 }
 
+func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) 
(model.TraceQueryResult, error) {
+       if tqo.TimeRange == nil {
+               return nil, errors.New("invalid query options: timeRange are 
required")
+       }
+       if tqo.TagProjection == nil || len(tqo.TagProjection.Names) == 0 {
+               return nil, errors.New("invalid query options: tagProjection is 
required")
+       }
+       var tsdb storage.TSDB[*tsTable, option]
+       var err error
+       db := t.tsdb.Load()
+       if db == nil {
+               tsdb, err = t.schemaRepo.loadTSDB(t.group)
+               if err != nil {
+                       return nil, err
+               }
+               t.tsdb.Store(tsdb)
+       } else {
+               tsdb = db.(storage.TSDB[*tsTable, option])
+       }
+
+       segments, err := tsdb.SelectSegments(*tqo.TimeRange)
+       if err != nil {
+               return nil, err
+       }
+       if len(segments) < 1 {
+               return nilResult, nil
+       }
+
+       result := queryResult{
+               ctx:           ctx,
+               segments:      segments,
+               tagProjection: tqo.TagProjection,
+       }
+       defer func() {
+               if err != nil {
+                       result.Release()
+               }
+       }()
+       var parts []*part
+       qo := queryOptions{
+               TraceQueryOptions: tqo,
+               traceID:           "",
+               minTimestamp:      tqo.TimeRange.Start.UnixNano(),
+               maxTimestamp:      tqo.TimeRange.End.UnixNano(),
+       }
+       var n int
+       tables := make([]*tsTable, 0)
+       for _, segment := range segments {
+               tt, _ := segment.Tables()
+               tables = append(tables, tt...)
+       }
+       for i := range tables {
+               s := tables[i].currentSnapshot()
+               if s == nil {
+                       continue
+               }
+               parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
+               if n < 1 {
+                       s.decRef()
+                       continue
+               }
+               result.snapshots = append(result.snapshots, s)
+       }
+
+       if err = t.searchBlocks(ctx, &result, parts, qo); err != nil {
+               return nil, err
+       }
+
+       return &result, nil
+}
+
+func (t *trace) searchBlocks(ctx context.Context, result *queryResult, parts 
[]*part, qo queryOptions) error {
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
+       defFn := startBlockScanSpan(ctx, qo.traceID, parts, result)
+       defer defFn()
+       tstIter := generateTstIter()
+       defer releaseTstIter(tstIter)
+       tstIter.init(bma, parts, qo.traceID)
+       if tstIter.Error() != nil {
+               return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
+       }
+       var hit int
+       var spanBlockBytes uint64
+       quota := t.pm.AvailableBytes()
+       for tstIter.nextBlock() {
+               if hit%checkDoneEvery == 0 {
+                       select {
+                       case <-ctx.Done():
+                               return errors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
+                                       len(result.data), 
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
+                       default:
+                       }
+               }
+               hit++
+               bc := generateBlockCursor()
+               p := tstIter.piPool[tstIter.idx]
+               bc.init(p.p, p.curBlock, qo)
+               result.data = append(result.data, bc)
+               spanBlockBytes += bc.bm.uncompressedSpanSizeBytes
+               if quota >= 0 && spanBlockBytes > uint64(quota) {
+                       return fmt.Errorf("block scan quota exceeded: used %d 
bytes, quota is %d bytes", spanBlockBytes, quota)
+               }
+       }
+       if tstIter.Error() != nil {
+               return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error())
+       }
+       return t.pm.AcquireResource(ctx, spanBlockBytes)
+}
+
+type queryResult struct {
+       ctx           context.Context
+       tagProjection *model.TagProjection
+       data          []*blockCursor
+       snapshots     []*snapshot
+       segments      []storage.Segment[*tsTable, option]
+       hit           int
+       loaded        bool
+}
+
+func (qr *queryResult) Pull() *model.TraceResult {
+       select {
+       case <-qr.ctx.Done():
+               return &model.TraceResult{
+                       Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: 
hit %d", qr.hit),
+               }
+       default:
+       }
+       if !qr.loaded {
+               if len(qr.data) == 0 {
+                       return nil
+               }
+
+               cursorChan := make(chan int, len(qr.data))
+               for i := 0; i < len(qr.data); i++ {
+                       go func(i int) {
+                               select {
+                               case <-qr.ctx.Done():
+                                       cursorChan <- i
+                                       return
+                               default:
+                               }
+                               tmpBlock := generateBlock()
+                               defer releaseBlock(tmpBlock)
+                               if !qr.data[i].loadData(tmpBlock) {
+                                       cursorChan <- i
+                                       return
+                               }
+                               cursorChan <- -1
+                       }(i)
+               }
+
+               blankCursorList := []int{}
+               for completed := 0; completed < len(qr.data); completed++ {
+                       result := <-cursorChan
+                       if result != -1 {
+                               blankCursorList = append(blankCursorList, 
result)
+                       }
+               }
+               select {
+               case <-qr.ctx.Done():
+                       return &model.TraceResult{
+                               Error: errors.WithMessagef(qr.ctx.Err(), 
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
+                       }
+               default:
+               }
+               sort.Slice(blankCursorList, func(i, j int) bool {
+                       return blankCursorList[i] > blankCursorList[j]
+               })
+               for _, index := range blankCursorList {
+                       qr.data = append(qr.data[:index], qr.data[index+1:]...)
+               }
+               qr.loaded = true
+               heap.Init(qr)
+       }
+       if len(qr.data) == 0 {
+               return nil
+       }
+       if len(qr.data) == 1 {
+               r := &model.TraceResult{}
+               bc := qr.data[0]
+               bc.copyAllTo(r, false)
+               qr.data = qr.data[:0]
+               releaseBlockCursor(bc)
+               return r
+       }
+       return qr.merge()
+}
+
+func (qr *queryResult) Release() {
+       for i, v := range qr.data {
+               releaseBlockCursor(v)
+               qr.data[i] = nil
+       }
+       qr.data = qr.data[:0]
+       for i := range qr.snapshots {
+               qr.snapshots[i].decRef()
+       }
+       qr.snapshots = qr.snapshots[:0]
+       for i := range qr.segments {
+               qr.segments[i].DecRef()
+       }
+}
+
+func (qr queryResult) Len() int {
+       return len(qr.data)
+}
+
+func (qr queryResult) Less(i, j int) bool {
+       return qr.data[i].bm.traceID < qr.data[j].bm.traceID
+}
+
+func (qr queryResult) Swap(i, j int) {
+       qr.data[i], qr.data[j] = qr.data[j], qr.data[i]
+}
+
+func (qr *queryResult) Push(x interface{}) {
+       qr.data = append(qr.data, x.(*blockCursor))
+}
+
+func (qr *queryResult) Pop() interface{} {
+       old := qr.data
+       n := len(old)
+       x := old[n-1]
+       qr.data = old[0 : n-1]
+       releaseBlockCursor(x)
+       return x
+}
+
+func (qr *queryResult) merge() *model.TraceResult {
+       result := &model.TraceResult{}
+       var lastTraceID string
+
+       for qr.Len() > 0 {
+               topBC := qr.data[0]
+               if lastTraceID != "" && topBC.bm.traceID != lastTraceID {
+                       return result
+               }
+               lastTraceID = topBC.bm.traceID
+
+               topBC.copyTo(result)
+               topBC.idx++
+
+               if topBC.idx >= len(topBC.spans) {
+                       heap.Pop(qr)
+               }
+       }
+
+       return result
+}
+
 func mustEncodeTagValue(name string, tagType databasev1.TagType, tagValue 
*modelv1.TagValue, num int) [][]byte {
        values := make([][]byte, num)
        tv := encodeTagValue(name, tagType, tagValue)
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
new file mode 100644
index 00000000..64569b46
--- /dev/null
+++ b/banyand/trace/query_test.go
@@ -0,0 +1,202 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "errors"
+       "testing"
+       "time"
+
+       "github.com/google/go-cmp/cmp"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/protobuf/testing/protocmp"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+func TestQueryResult(t *testing.T) {
+       tests := []struct {
+               wantErr      error
+               name         string
+               tracesList   []*traces
+               traceID      string
+               want         []model.TraceResult
+               minTimestamp int64
+               maxTimestamp int64
+       }{
+               {
+                       name:         "Test with single trace data from tsTS1",
+                       tracesList:   []*traces{tsTS1},
+                       traceID:      "trace1",
+                       minTimestamp: 1,
+                       maxTimestamp: 1,
+                       want: []model.TraceResult{{
+                               Error: nil,
+                               TID:   "trace1",
+                               Tags: []model.Tag{
+                                       {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+                                       {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1")}},
+                                       {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10)}},
+                               },
+                               Spans: [][]byte{[]byte("span1")},
+                       }},
+               },
+               {
+                       name:         "Test with multiple trace data from tsTS1 
and tsTS2",
+                       tracesList:   []*traces{tsTS1, tsTS2},
+                       traceID:      "trace1",
+                       minTimestamp: 1,
+                       maxTimestamp: 2,
+                       want: []model.TraceResult{{
+                               Error: nil,
+                               TID:   "trace1",
+                               Tags: []model.Tag{
+                                       {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), 
strArrTagValue([]string{"value5", "value6"})}},
+                                       {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1"), strTagValue("value4")}},
+                                       {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10), int64TagValue(40)}},
+                               },
+                               Spans: [][]byte{[]byte("span1"), 
[]byte("span4")},
+                       }},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       verify := func(t *testing.T, tst *tsTable) {
+                               defer tst.Close()
+                               queryOpts := queryOptions{
+                                       minTimestamp: tt.minTimestamp,
+                                       maxTimestamp: tt.maxTimestamp,
+                               }
+                               s := tst.currentSnapshot()
+                               require.NotNil(t, s)
+                               defer s.decRef()
+                               pp, _ := s.getParts(nil, 
queryOpts.minTimestamp, queryOpts.maxTimestamp)
+                               bma := generateBlockMetadataArray()
+                               defer releaseBlockMetadataArray(bma)
+                               ti := &tstIter{}
+                               ti.init(bma, pp, tt.traceID)
+
+                               var result queryResult
+                               result.ctx = context.TODO()
+                               // Query all tags
+                               result.tagProjection = allTagProjections
+                               for ti.nextBlock() {
+                                       bc := generateBlockCursor()
+                                       p := ti.piPool[ti.idx]
+                                       opts := queryOpts
+                                       opts.TagProjection = allTagProjections
+                                       bc.init(p.p, p.curBlock, opts)
+                                       result.data = append(result.data, bc)
+                               }
+                               defer result.Release()
+
+                               var got []model.TraceResult
+                               for {
+                                       r := result.Pull()
+                                       if r == nil {
+                                               break
+                                       }
+                                       got = append(got, *r)
+                               }
+
+                               if !errors.Is(ti.Error(), tt.wantErr) {
+                                       t.Errorf("Unexpected error: got %v, 
want %v", ti.err, tt.wantErr)
+                               }
+
+                               if diff := cmp.Diff(got, tt.want,
+                                       protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
+                                       t.Errorf("Unexpected []pbv1.Result 
(-got +want):\n%s", diff)
+                               }
+                       }
+
+                       t.Run("memory snapshot", func(t *testing.T) {
+                               tmpPath, defFn := test.Space(require.New(t))
+                               defer defFn()
+                               tst := &tsTable{
+                                       loopCloser:    run.NewCloser(2),
+                                       introductions: make(chan *introduction),
+                                       fileSystem:    fs.NewLocalFileSystem(),
+                                       root:          tmpPath,
+                               }
+                               tst.gc.init(tst)
+                               flushCh := make(chan *flusherIntroduction)
+                               mergeCh := make(chan *mergerIntroduction)
+                               introducerWatcher := make(watcher.Channel, 1)
+                               go tst.introducerLoop(flushCh, mergeCh, 
introducerWatcher, 1)
+                               for _, traces := range tt.tracesList {
+                                       tst.mustAddTraces(traces)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               verify(t, tst)
+                       })
+
+                       t.Run("file snapshot", func(t *testing.T) {
+                               // Initialize a tstIter object.
+                               tmpPath, defFn := test.Space(require.New(t))
+                               fileSystem := fs.NewLocalFileSystem()
+                               defer defFn()
+                               tst, err := newTSTable(fileSystem, tmpPath, 
common.Position{},
+                                       logger.GetLogger("test"), 
timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: 
newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil)
+                               require.NoError(t, err)
+                               for _, traces := range tt.tracesList {
+                                       tst.mustAddTraces(traces)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               // wait until the introducer is done
+                               if len(tt.tracesList) > 0 {
+                                       for {
+                                               snp := tst.currentSnapshot()
+                                               if snp == nil {
+                                                       time.Sleep(100 * 
time.Millisecond)
+                                                       continue
+                                               }
+                                               if snp.creator == 
snapshotCreatorMemPart {
+                                                       snp.decRef()
+                                                       time.Sleep(100 * 
time.Millisecond)
+                                                       continue
+                                               }
+                                               snp.decRef()
+                                               tst.Close()
+                                               break
+                                       }
+                               }
+
+                               // reopen the table
+                               tst, err = newTSTable(fileSystem, tmpPath, 
common.Position{},
+                                       logger.GetLogger("test"), 
timestamp.TimeRange{}, option{
+                                               flushTimeout: 
defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting(),
+                                               protector: protector.Nop{},
+                                       }, nil)
+                               require.NoError(t, err)
+
+                               verify(t, tst)
+                       })
+               })
+       }
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 243ebd13..3bbded02 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -67,13 +67,14 @@ type snapshot struct {
        ref int32
 }
 
-func (s *snapshot) getParts(dst []*part, minTimestamp, maxTimestamp int64) 
([]*part, int) {
+func (s *snapshot) getParts(dst []*part, minTimestamp int64, maxTimestamp 
int64) ([]*part, int) {
        var count int
        for _, p := range s.parts {
                pm := p.p.partMetadata
                if maxTimestamp < pm.MinTimestamp || minTimestamp > 
pm.MaxTimestamp {
                        continue
                }
+               // TODO: filter parts
                dst = append(dst, p.p)
                count++
        }
diff --git a/banyand/trace/tracing.go b/banyand/trace/tracing.go
index 84891858..9c996424 100644
--- a/banyand/trace/tracing.go
+++ b/banyand/trace/tracing.go
@@ -23,11 +23,14 @@ import (
        "time"
 
        "github.com/dustin/go-humanize"
+
+       "github.com/apache/skywalking-banyandb/pkg/query"
 )
 
+// TODO: check the header.
 const (
        partMetadataHeader = "MinTimestamp, MaxTimestamp, CompressionSize, 
UncompressedSize, TotalCount, BlocksCount"
-       blockHeader        = "PartID, SeriesID, MinTimestamp, MaxTimestamp, 
Count, UncompressedSize"
+       blockHeader        = "PartID, TraceID, Count, UncompressedSize"
 )
 
 func (pm *partMetadata) String() string {
@@ -45,6 +48,26 @@ func (bc *blockCursor) String() string {
                bc.p.partMetadata.ID, bc.bm.traceID, bc.bm.count, 
humanize.Bytes(bc.bm.uncompressedSpanSizeBytes))
 }
 
-func startBlockScanSpan(_ context.Context, _ int, _ []*part) func() {
-       panic("unimplemented")
+func startBlockScanSpan(ctx context.Context, tid string, parts []*part, qr 
*queryResult) func() {
+       tracer := query.GetTracer(ctx)
+       if tracer == nil {
+               return func() {}
+       }
+
+       span, _ := tracer.StartSpan(ctx, "scan-blocks")
+       span.Tag("trace_id", tid)
+       span.Tag("part_header", partMetadataHeader)
+       span.Tag("part_num", fmt.Sprintf("%d", len(parts)))
+       for i := range parts {
+               span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID, 
parts[i].path),
+                       parts[i].partMetadata.String())
+       }
+
+       return func() {
+               span.Tag("block_header", blockHeader)
+               for i := range qr.data {
+                       span.Tag(fmt.Sprintf("block_%d", i), 
qr.data[i].String())
+               }
+               span.Stop()
+       }
 }
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index cd0b1075..7443831b 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -18,7 +18,6 @@
 package trace
 
 import (
-       "container/heap"
        "encoding/json"
        "errors"
        "fmt"
@@ -310,11 +309,10 @@ func (tst *tsTable) mustAddTraces(ts *traces) {
 }
 
 type tstIter struct {
-       err           error
-       parts         []*part
-       piPool        []partIter
-       piHeap        partIterHeap
-       nextBlockNoop bool
+       err    error
+       parts  []*part
+       piPool []*partIter
+       idx    int
 }
 
 func (ti *tstIter) reset() {
@@ -328,55 +326,33 @@ func (ti *tstIter) reset() {
        }
        ti.piPool = ti.piPool[:0]
 
-       for i := range ti.piHeap {
-               ti.piHeap[i] = nil
-       }
-       ti.piHeap = ti.piHeap[:0]
-
        ti.err = nil
-       ti.nextBlockNoop = false
+       ti.idx = 0
 }
 
-func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, tids []string, 
minTimestamp, maxTimestamp int64) {
+func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, tid string) {
        ti.reset()
        ti.parts = parts
 
        if n := len(ti.parts) - cap(ti.piPool); n > 0 {
-               ti.piPool = append(ti.piPool[:cap(ti.piPool)], make([]partIter, 
n)...)
+               ti.piPool = append(ti.piPool[:cap(ti.piPool)], 
make([]*partIter, n)...)
        }
        ti.piPool = ti.piPool[:len(ti.parts)]
        for i, p := range ti.parts {
-               ti.piPool[i].init(bma, p, tids, minTimestamp, maxTimestamp)
+               ti.piPool[i] = &partIter{}
+               ti.piPool[i].init(bma, p, tid)
        }
 
-       ti.piHeap = ti.piHeap[:0]
-       for i := range ti.piPool {
-               ps := &ti.piPool[i]
-               if !ps.nextBlock() {
-                       if err := ps.error(); err != nil {
-                               ti.err = fmt.Errorf("cannot initialize tsTable 
iteration: %w", err)
-                               return
-                       }
-                       continue
-               }
-               ti.piHeap = append(ti.piHeap, ps)
-       }
-       if len(ti.piHeap) == 0 {
+       if len(ti.piPool) == 0 {
                ti.err = io.EOF
                return
        }
-       heap.Init(&ti.piHeap)
-       ti.nextBlockNoop = true
 }
 
 func (ti *tstIter) nextBlock() bool {
        if ti.err != nil {
                return false
        }
-       if ti.nextBlockNoop {
-               ti.nextBlockNoop = false
-               return true
-       }
 
        ti.err = ti.next()
        if ti.err != nil {
@@ -389,22 +365,17 @@ func (ti *tstIter) nextBlock() bool {
 }
 
 func (ti *tstIter) next() error {
-       psMin := ti.piHeap[0]
-       if psMin.nextBlock() {
-               heap.Fix(&ti.piHeap, 0)
-               return nil
-       }
-
-       if err := psMin.error(); err != nil {
-               return err
-       }
-
-       heap.Pop(&ti.piHeap)
-
-       if len(ti.piHeap) == 0 {
-               return io.EOF
+       for ti.idx < len(ti.piPool) {
+               pi := ti.piPool[ti.idx]
+               if pi.nextBlock() {
+                       return nil
+               }
+               if err := pi.error(); err != nil {
+                       return err
+               }
+               ti.idx++
        }
-       return nil
+       return io.EOF
 }
 
 func (ti *tstIter) Error() error {
@@ -428,30 +399,3 @@ func releaseTstIter(ti *tstIter) {
 }
 
 var tstIterPool = pool.Register[*tstIter]("trace-tstIter")
-
-type partIterHeap []*partIter
-
-func (pih *partIterHeap) Len() int {
-       return len(*pih)
-}
-
-func (pih *partIterHeap) Less(i, j int) bool {
-       x := *pih
-       return x[i].curBlock.less(x[j].curBlock)
-}
-
-func (pih *partIterHeap) Swap(i, j int) {
-       x := *pih
-       x[i], x[j] = x[j], x[i]
-}
-
-func (pih *partIterHeap) Push(x any) {
-       *pih = append(*pih, x.(*partIter))
-}
-
-func (pih *partIterHeap) Pop() any {
-       a := *pih
-       v := a[len(a)-1]
-       *pih = a[:len(a)-1]
-       return v
-}
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 169b101f..2ee0c4db 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -115,10 +116,10 @@ func Test_tsTable_mustAddTraces(t *testing.T) {
 
 func Test_tstIter(t *testing.T) {
        type testCtx struct {
+               tsList       []*traces
                wantErr      error
                name         string
-               tsList       []*traces
-               tids         []string
+               tid          string
                want         []blockMetadata
                minTimestamp int64
                maxTimestamp int64
@@ -136,14 +137,14 @@ func Test_tstIter(t *testing.T) {
                pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp)
                require.Equal(t, len(s.parts), n)
                ti := &tstIter{}
-               ti.init(bma, pp, tt.tids, tt.minTimestamp, tt.maxTimestamp)
+               ti.init(bma, pp, tt.tid)
                var got []blockMetadata
                for ti.nextBlock() {
-                       if ti.piHeap[0].curBlock.traceID == "" {
+                       if ti.piPool[ti.idx].curBlock.traceID == "" {
                                t.Errorf("Expected curBlock to be initialized, 
but it was nil")
                        }
                        var bm blockMetadata
-                       bm.copyFrom(ti.piHeap[0].curBlock)
+                       bm.copyFrom(ti.piPool[ti.idx].curBlock)
                        got = append(got, bm)
                }
 
@@ -166,37 +167,28 @@ func Test_tstIter(t *testing.T) {
        t.Run("memory snapshot", func(t *testing.T) {
                tests := []testCtx{
                        {
-                               name:         "Test with no traces",
-                               tsList:       []*traces{},
-                               tids:         []string{"trace1", "trace2", 
"trace3"},
-                               minTimestamp: 1,
-                               maxTimestamp: 1,
+                               name:   "Test with no traces",
+                               tsList: []*traces{},
                        },
                        {
                                name:         "Test with single part",
                                tsList:       []*traces{tsTS1},
-                               tids:         []string{"trace1", "trace2", 
"trace3"},
+                               tid:          "trace1",
                                minTimestamp: 1,
-                               maxTimestamp: 1,
+                               maxTimestamp: 2,
                                want: []blockMetadata{
                                        {traceID: "trace1", count: 1, 
uncompressedSpanSizeBytes: 5},
-                                       {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
-                                       {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
                                },
                        },
                        {
                                name:         "Test with multiple parts",
                                tsList:       []*traces{tsTS1, tsTS2},
-                               tids:         []string{"trace1", "trace2", 
"trace3"},
+                               tid:          "trace1",
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
                                        {traceID: "trace1", count: 1, 
uncompressedSpanSizeBytes: 5},
                                        {traceID: "trace1", count: 1, 
uncompressedSpanSizeBytes: 5},
-                                       {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
-                                       {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
-                                       {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
-                                       {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
                                },
                        },
                }
@@ -225,6 +217,10 @@ func Test_tstIter(t *testing.T) {
        })
 }
 
+var allTagProjections = &model.TagProjection{
+       Names: []string{"strArrTag", "strTag", "intTag"},
+}
+
 var tsTS1 = &traces{
        traceIDs:   []string{"trace1", "trace2", "trace3"},
        timestamps: []int64{1, 1, 1},
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index e0107d86..f3349d06 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -348,7 +348,13 @@ func (t *TraceQueryOptions) CopyFrom(other 
*TraceQueryOptions) {
 // TraceResult is the result of a query.
 type TraceResult struct {
        Error error
-       Tags  []Tag
        Spans [][]byte
-       TIDs  []string
+       TID   string
+       Tags  []Tag
+}
+
+// TraceQueryResult is the result of a trace query.
+type TraceQueryResult interface {
+       Pull() *TraceResult
+       Release()
 }


Reply via email to