This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 27c656d8 Implement traceID-based query (#735)
27c656d8 is described below
commit 27c656d8ebb95fd5b0823110c42507efb52a4261
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Aug 26 22:19:09 2025 +0800
Implement traceID-based query (#735)
* Implement traceID-based query
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 4 +
banyand/trace/block.go | 10 +-
banyand/trace/block_writer.go | 1 -
banyand/trace/merger.go | 30 ++++-
banyand/trace/part_iter.go | 68 +++--------
banyand/trace/query.go | 277 ++++++++++++++++++++++++++++++++++++++++--
banyand/trace/query_test.go | 202 ++++++++++++++++++++++++++++++
banyand/trace/snapshot.go | 3 +-
banyand/trace/tracing.go | 29 ++++-
banyand/trace/tstable.go | 96 +++------------
banyand/trace/tstable_test.go | 34 +++---
pkg/query/model/model.go | 10 +-
12 files changed, 588 insertions(+), 176 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ae00c13d..446f62c6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -33,6 +33,10 @@ Release Notes.
- Enhance flusher and introducer loops to support merging operations,
improving efficiency by eliminating the need for a separate merge loop and
optimizing data handling process during flushing and merging
- Enhance stream synchronization with configurable sync interval - Allows
customization of synchronization timing for better performance tuning
- Refactor flusher and introducer loops to support conditional merging -
Optimizes data processing by adding conditional logic to merge operations
+- New storage engine for trace:
+ - Data ingestion and retrieval.
+ - Flush memory data to disk.
+ - Merge memory data and disk data.
### Bug Fixes
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 404bb158..1fc28c26 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -288,6 +288,7 @@ func mustReadSpansFrom(spans [][]byte, sm *dataBlock, count
int, reader fs.Reade
if uint64(len(src)) < spanLen {
logger.Panicf("insufficient data for span: need %d
bytes, have %d", spanLen, len(src))
}
+ spans[i] = spans[i][:0]
spans[i] = append(spans[i], src[:spanLen]...)
src = src[spanLen:]
}
@@ -311,6 +312,7 @@ func mustSeqReadSpansFrom(spans [][]byte, sm *dataBlock,
count int, reader *seqR
if uint64(len(src)) < spanLen {
logger.Panicf("insufficient data for span: need %d
bytes, have %d", spanLen, len(src))
}
+ spans[i] = spans[i][:0]
spans[i] = append(spans[i], src[:spanLen]...)
src = src[spanLen:]
}
@@ -381,11 +383,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult,
desc bool) {
return
}
- requiredCapacity := end - start
- r.TIDs = append(r.TIDs, make([]string, requiredCapacity)...)
- for i := range r.TIDs[len(r.TIDs)-requiredCapacity:] {
- r.TIDs[len(r.TIDs)-requiredCapacity+i] = bc.bm.traceID
- }
+ r.TID = bc.bm.traceID
r.Spans = append(r.Spans, bc.spans[start:end]...)
if desc {
@@ -417,7 +415,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult, desc
bool) {
func (bc *blockCursor) copyTo(r *model.TraceResult) {
r.Spans = append(r.Spans, bc.spans[bc.idx])
- r.TIDs = append(r.TIDs, bc.bm.traceID)
+ r.TID = bc.bm.traceID
if len(r.Tags) != len(bc.tagProjection.Names) {
for _, name := range bc.tagProjection.Names {
r.Tags = append(r.Tags, model.Tag{Name: name})
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 9927dbe6..f7c46c77 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -270,7 +270,6 @@ func (bw *blockWriter) Flush(pm *partMetadata, tf
*traceIDFilter, tt *tagType) {
pm.UncompressedSpanSizeBytes = bw.totalUncompressedSpanSizeBytes
pm.TotalCount = bw.totalCount
pm.BlocksCount = bw.totalBlocksCount
- // TODO: update timestamp metadata when merging blocks
pm.MinTimestamp = bw.totalMinTimestamp
pm.MaxTimestamp = bw.totalMaxTimestamp
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 89a8fb98..d9dcb3d2 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -260,7 +260,25 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
}
}
- pm, err := mergeBlocks(closeCh, bw, br)
+ var minTimestamp, maxTimestamp int64
+ for i, pw := range parts {
+ pm := pw.p.partMetadata
+ if i == 0 {
+ minTimestamp = pm.MinTimestamp
+ maxTimestamp = pm.MaxTimestamp
+ continue
+ }
+ if pm.MinTimestamp < minTimestamp {
+ minTimestamp = pm.MinTimestamp
+ }
+ if pm.MaxTimestamp > maxTimestamp {
+ maxTimestamp = pm.MaxTimestamp
+ }
+ }
+
+ pm, tf, tt, err := mergeBlocks(closeCh, bw, br)
+ pm.MinTimestamp = minTimestamp
+ pm.MaxTimestamp = maxTimestamp
releaseBlockWriter(bw)
releaseBlockReader(br)
for i := range pii {
@@ -270,6 +288,8 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
return nil, err
}
pm.mustWriteMetadata(fileSystem, dstPath)
+ tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+ tt.mustWriteTagType(fileSystem, dstPath)
fileSystem.SyncPath(dstPath)
p := mustOpenFilePart(partID, root, fileSystem)
@@ -278,7 +298,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
var errClosed = fmt.Errorf("the merger is closed")
-func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader)
(*partMetadata, error) {
+func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader)
(*partMetadata, *traceIDFilter, *tagType, error) {
pendingBlockIsEmpty := true
pendingBlock := generateBlockPointer()
defer releaseBlockPointer(pendingBlock)
@@ -299,7 +319,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter,
br *blockReader) (*pa
for br.nextBlockMetadata() {
select {
case <-closeCh:
- return nil, errClosed
+ return nil, nil, nil, errClosed
default:
}
b := br.block
@@ -342,7 +362,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter,
br *blockReader) (*pa
pendingBlockIsEmpty = true
}
if err := br.error(); err != nil {
- return nil, fmt.Errorf("cannot read block to merge: %w", err)
+ return nil, nil, nil, fmt.Errorf("cannot read block to merge:
%w", err)
}
if !pendingBlockIsEmpty {
bw.mustWriteBlock(pendingBlock.bm.traceID, &pendingBlock.block)
@@ -352,7 +372,7 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter,
br *blockReader) (*pa
var tf traceIDFilter
tt := make(tagType)
bw.Flush(&pm, &tf, &tt)
- return &pm, nil
+ return &pm, &tf, &tt, nil
}
func mergeTwoBlocks(target, left, right *blockPointer) {
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index fc36f291..9982b818 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -36,21 +36,17 @@ type partIter struct {
err error
p *part
curBlock *blockMetadata
- tids []string
+ tid string
primaryBlockMetadata []primaryBlockMetadata
bms []blockMetadata
compressedPrimaryBuf []byte
primaryBuf []byte
- tidIdx int
- minTimestamp int64
- maxTimestamp int64
}
func (pi *partIter) reset() {
pi.curBlock = nil
pi.p = nil
- pi.tids = nil
- pi.tidIdx = 0
+ pi.tid = ""
pi.primaryBlockMetadata = nil
pi.bms = nil
pi.compressedPrimaryBuf = pi.compressedPrimaryBuf[:0]
@@ -58,19 +54,17 @@ func (pi *partIter) reset() {
pi.err = nil
}
-func (pi *partIter) init(bma *blockMetadataArray, p *part, tids []string,
minTimestamp, maxTimestamp int64) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, tid string) {
pi.reset()
pi.curBlock = &blockMetadata{}
pi.p = p
pi.bms = bma.arr
- pi.tids = tids
- pi.minTimestamp = minTimestamp
- pi.maxTimestamp = maxTimestamp
+ pi.tid = tid
pi.primaryBlockMetadata = p.primaryBlockMetadata
- pi.nextTraceID()
+ pi.curBlock.traceID = tid
}
func (pi *partIter) nextBlock() bool {
@@ -96,47 +90,14 @@ func (pi *partIter) error() error {
return pi.err
}
-func (pi *partIter) nextTraceID() bool {
- if pi.tidIdx >= len(pi.tids) {
- pi.err = io.EOF
- return false
- }
- pi.curBlock.traceID = pi.tids[pi.tidIdx]
- pi.tidIdx++
- return true
-}
-
-func (pi *partIter) searchTargetTraceID(tid string) bool {
- if pi.curBlock.traceID >= tid {
- return true
- }
- if !pi.nextTraceID() {
- return false
- }
- if pi.curBlock.traceID >= tid {
- return true
- }
- tids := pi.tids[pi.tidIdx:]
- pi.tidIdx += sort.Search(len(tids), func(i int) bool {
- return tid <= tids[i]
- })
- if pi.tidIdx >= len(pi.tids) {
- pi.tidIdx = len(pi.tids)
- pi.err = io.EOF
- return false
- }
- pi.curBlock.traceID = pi.tids[pi.tidIdx]
- pi.tidIdx++
- return true
-}
-
func (pi *partIter) loadNextBlockMetadata() bool {
if len(pi.primaryBlockMetadata) > 0 {
- if !pi.searchTargetTraceID(pi.primaryBlockMetadata[0].traceID) {
+ if pi.curBlock.traceID < pi.primaryBlockMetadata[0].traceID {
+ pi.err = io.EOF
return false
}
- pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata,
pi.curBlock.traceID)
+ pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata,
pi.curBlock.traceID)
pbm := &pi.primaryBlockMetadata[0]
pi.primaryBlockMetadata = pi.primaryBlockMetadata[1:]
if pi.curBlock.traceID < pbm.traceID {
@@ -193,24 +154,23 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata,
mr *primaryBlockMetada
func (pi *partIter) findBlock() bool {
bhs := pi.bms
- for len(bhs) > 0 {
+ if len(bhs) > 0 {
tid := pi.curBlock.traceID
if bhs[0].traceID < tid {
n := sort.Search(len(bhs), func(i int) bool {
return tid <= bhs[i].traceID
})
if n == len(bhs) {
- break
+ pi.bms = nil
+ return false
}
bhs = bhs[n:]
}
bm := &bhs[0]
- if bm.traceID != tid {
- if !pi.searchTargetTraceID(bm.traceID) {
- return false
- }
- continue
+ if bm.traceID > tid {
+ pi.bms = bhs[:0]
+ return false
}
pi.curBlock = bm
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 503fb2ec..df81f15c 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -18,39 +18,298 @@
package trace
import (
- "github.com/apache/skywalking-banyandb/api/common"
+ "container/heap"
+ "context"
+ "fmt"
+ "sort"
+
+ "github.com/pkg/errors"
+
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
+const checkDoneEvery = 128
+
+var nilResult = model.TraceQueryResult(nil)
+
type queryOptions struct {
+ traceID string
model.TraceQueryOptions
- seriesToEntity map[common.SeriesID][]*modelv1.TagValue
- sortedSids []common.SeriesID
- minTimestamp int64
- maxTimestamp int64
+ minTimestamp int64
+ maxTimestamp int64
}
func (qo *queryOptions) reset() {
qo.TraceQueryOptions.Reset()
- qo.seriesToEntity = nil
- qo.sortedSids = nil
+ qo.traceID = ""
qo.minTimestamp = 0
qo.maxTimestamp = 0
}
func (qo *queryOptions) copyFrom(other *queryOptions) {
qo.TraceQueryOptions.CopyFrom(&other.TraceQueryOptions)
- qo.seriesToEntity = other.seriesToEntity
- qo.sortedSids = other.sortedSids
+ qo.traceID = other.traceID
qo.minTimestamp = other.minTimestamp
qo.maxTimestamp = other.maxTimestamp
}
+func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions)
(model.TraceQueryResult, error) {
+ if tqo.TimeRange == nil {
+ return nil, errors.New("invalid query options: timeRange are
required")
+ }
+ if tqo.TagProjection == nil || len(tqo.TagProjection.Names) == 0 {
+ return nil, errors.New("invalid query options: tagProjection is
required")
+ }
+ var tsdb storage.TSDB[*tsTable, option]
+ var err error
+ db := t.tsdb.Load()
+ if db == nil {
+ tsdb, err = t.schemaRepo.loadTSDB(t.group)
+ if err != nil {
+ return nil, err
+ }
+ t.tsdb.Store(tsdb)
+ } else {
+ tsdb = db.(storage.TSDB[*tsTable, option])
+ }
+
+ segments, err := tsdb.SelectSegments(*tqo.TimeRange)
+ if err != nil {
+ return nil, err
+ }
+ if len(segments) < 1 {
+ return nilResult, nil
+ }
+
+ result := queryResult{
+ ctx: ctx,
+ segments: segments,
+ tagProjection: tqo.TagProjection,
+ }
+ defer func() {
+ if err != nil {
+ result.Release()
+ }
+ }()
+ var parts []*part
+ qo := queryOptions{
+ TraceQueryOptions: tqo,
+ traceID: "",
+ minTimestamp: tqo.TimeRange.Start.UnixNano(),
+ maxTimestamp: tqo.TimeRange.End.UnixNano(),
+ }
+ var n int
+ tables := make([]*tsTable, 0)
+ for _, segment := range segments {
+ tt, _ := segment.Tables()
+ tables = append(tables, tt...)
+ }
+ for i := range tables {
+ s := tables[i].currentSnapshot()
+ if s == nil {
+ continue
+ }
+ parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
+ if n < 1 {
+ s.decRef()
+ continue
+ }
+ result.snapshots = append(result.snapshots, s)
+ }
+
+ if err = t.searchBlocks(ctx, &result, parts, qo); err != nil {
+ return nil, err
+ }
+
+ return &result, nil
+}
+
+func (t *trace) searchBlocks(ctx context.Context, result *queryResult, parts
[]*part, qo queryOptions) error {
+ bma := generateBlockMetadataArray()
+ defer releaseBlockMetadataArray(bma)
+ defFn := startBlockScanSpan(ctx, qo.traceID, parts, result)
+ defer defFn()
+ tstIter := generateTstIter()
+ defer releaseTstIter(tstIter)
+ tstIter.init(bma, parts, qo.traceID)
+ if tstIter.Error() != nil {
+ return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
+ }
+ var hit int
+ var spanBlockBytes uint64
+ quota := t.pm.AvailableBytes()
+ for tstIter.nextBlock() {
+ if hit%checkDoneEvery == 0 {
+ select {
+ case <-ctx.Done():
+ return errors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
+ len(result.data),
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
+ default:
+ }
+ }
+ hit++
+ bc := generateBlockCursor()
+ p := tstIter.piPool[tstIter.idx]
+ bc.init(p.p, p.curBlock, qo)
+ result.data = append(result.data, bc)
+ spanBlockBytes += bc.bm.uncompressedSpanSizeBytes
+ if quota >= 0 && spanBlockBytes > uint64(quota) {
+ return fmt.Errorf("block scan quota exceeded: used %d
bytes, quota is %d bytes", spanBlockBytes, quota)
+ }
+ }
+ if tstIter.Error() != nil {
+ return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error())
+ }
+ return t.pm.AcquireResource(ctx, spanBlockBytes)
+}
+
+type queryResult struct {
+ ctx context.Context
+ tagProjection *model.TagProjection
+ data []*blockCursor
+ snapshots []*snapshot
+ segments []storage.Segment[*tsTable, option]
+ hit int
+ loaded bool
+}
+
+func (qr *queryResult) Pull() *model.TraceResult {
+ select {
+ case <-qr.ctx.Done():
+ return &model.TraceResult{
+ Error: errors.WithMessagef(qr.ctx.Err(), "interrupt:
hit %d", qr.hit),
+ }
+ default:
+ }
+ if !qr.loaded {
+ if len(qr.data) == 0 {
+ return nil
+ }
+
+ cursorChan := make(chan int, len(qr.data))
+ for i := 0; i < len(qr.data); i++ {
+ go func(i int) {
+ select {
+ case <-qr.ctx.Done():
+ cursorChan <- i
+ return
+ default:
+ }
+ tmpBlock := generateBlock()
+ defer releaseBlock(tmpBlock)
+ if !qr.data[i].loadData(tmpBlock) {
+ cursorChan <- i
+ return
+ }
+ cursorChan <- -1
+ }(i)
+ }
+
+ blankCursorList := []int{}
+ for completed := 0; completed < len(qr.data); completed++ {
+ result := <-cursorChan
+ if result != -1 {
+ blankCursorList = append(blankCursorList,
result)
+ }
+ }
+ select {
+ case <-qr.ctx.Done():
+ return &model.TraceResult{
+ Error: errors.WithMessagef(qr.ctx.Err(),
"interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)),
+ }
+ default:
+ }
+ sort.Slice(blankCursorList, func(i, j int) bool {
+ return blankCursorList[i] > blankCursorList[j]
+ })
+ for _, index := range blankCursorList {
+ qr.data = append(qr.data[:index], qr.data[index+1:]...)
+ }
+ qr.loaded = true
+ heap.Init(qr)
+ }
+ if len(qr.data) == 0 {
+ return nil
+ }
+ if len(qr.data) == 1 {
+ r := &model.TraceResult{}
+ bc := qr.data[0]
+ bc.copyAllTo(r, false)
+ qr.data = qr.data[:0]
+ releaseBlockCursor(bc)
+ return r
+ }
+ return qr.merge()
+}
+
+func (qr *queryResult) Release() {
+ for i, v := range qr.data {
+ releaseBlockCursor(v)
+ qr.data[i] = nil
+ }
+ qr.data = qr.data[:0]
+ for i := range qr.snapshots {
+ qr.snapshots[i].decRef()
+ }
+ qr.snapshots = qr.snapshots[:0]
+ for i := range qr.segments {
+ qr.segments[i].DecRef()
+ }
+}
+
+func (qr queryResult) Len() int {
+ return len(qr.data)
+}
+
+func (qr queryResult) Less(i, j int) bool {
+ return qr.data[i].bm.traceID < qr.data[j].bm.traceID
+}
+
+func (qr queryResult) Swap(i, j int) {
+ qr.data[i], qr.data[j] = qr.data[j], qr.data[i]
+}
+
+func (qr *queryResult) Push(x interface{}) {
+ qr.data = append(qr.data, x.(*blockCursor))
+}
+
+func (qr *queryResult) Pop() interface{} {
+ old := qr.data
+ n := len(old)
+ x := old[n-1]
+ qr.data = old[0 : n-1]
+ releaseBlockCursor(x)
+ return x
+}
+
+func (qr *queryResult) merge() *model.TraceResult {
+ result := &model.TraceResult{}
+ var lastTraceID string
+
+ for qr.Len() > 0 {
+ topBC := qr.data[0]
+ if lastTraceID != "" && topBC.bm.traceID != lastTraceID {
+ return result
+ }
+ lastTraceID = topBC.bm.traceID
+
+ topBC.copyTo(result)
+ topBC.idx++
+
+ if topBC.idx >= len(topBC.spans) {
+ heap.Pop(qr)
+ }
+ }
+
+ return result
+}
+
func mustEncodeTagValue(name string, tagType databasev1.TagType, tagValue
*modelv1.TagValue, num int) [][]byte {
values := make([][]byte, num)
tv := encodeTagValue(name, tagType, tagValue)
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
new file mode 100644
index 00000000..64569b46
--- /dev/null
+++ b/banyand/trace/query_test.go
@@ -0,0 +1,202 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/testing/protocmp"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+func TestQueryResult(t *testing.T) {
+ tests := []struct {
+ wantErr error
+ name string
+ tracesList []*traces
+ traceID string
+ want []model.TraceResult
+ minTimestamp int64
+ maxTimestamp int64
+ }{
+ {
+ name: "Test with single trace data from tsTS1",
+ tracesList: []*traces{tsTS1},
+ traceID: "trace1",
+ minTimestamp: 1,
+ maxTimestamp: 1,
+ want: []model.TraceResult{{
+ Error: nil,
+ TID: "trace1",
+ Tags: []model.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"})}},
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10)}},
+ },
+ Spans: [][]byte{[]byte("span1")},
+ }},
+ },
+ {
+ name: "Test with multiple trace data from tsTS1
and tsTS2",
+ tracesList: []*traces{tsTS1, tsTS2},
+ traceID: "trace1",
+ minTimestamp: 1,
+ maxTimestamp: 2,
+ want: []model.TraceResult{{
+ Error: nil,
+ TID: "trace1",
+ Tags: []model.Tag{
+ {Name: "strArrTag", Values:
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}),
strArrTagValue([]string{"value5", "value6"})}},
+ {Name: "strTag", Values:
[]*modelv1.TagValue{strTagValue("value1"), strTagValue("value4")}},
+ {Name: "intTag", Values:
[]*modelv1.TagValue{int64TagValue(10), int64TagValue(40)}},
+ },
+ Spans: [][]byte{[]byte("span1"),
[]byte("span4")},
+ }},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ verify := func(t *testing.T, tst *tsTable) {
+ defer tst.Close()
+ queryOpts := queryOptions{
+ minTimestamp: tt.minTimestamp,
+ maxTimestamp: tt.maxTimestamp,
+ }
+ s := tst.currentSnapshot()
+ require.NotNil(t, s)
+ defer s.decRef()
+ pp, _ := s.getParts(nil,
queryOpts.minTimestamp, queryOpts.maxTimestamp)
+ bma := generateBlockMetadataArray()
+ defer releaseBlockMetadataArray(bma)
+ ti := &tstIter{}
+ ti.init(bma, pp, tt.traceID)
+
+ var result queryResult
+ result.ctx = context.TODO()
+ // Query all tags
+ result.tagProjection = allTagProjections
+ for ti.nextBlock() {
+ bc := generateBlockCursor()
+ p := ti.piPool[ti.idx]
+ opts := queryOpts
+ opts.TagProjection = allTagProjections
+ bc.init(p.p, p.curBlock, opts)
+ result.data = append(result.data, bc)
+ }
+ defer result.Release()
+
+ var got []model.TraceResult
+ for {
+ r := result.Pull()
+ if r == nil {
+ break
+ }
+ got = append(got, *r)
+ }
+
+ if !errors.Is(ti.Error(), tt.wantErr) {
+ t.Errorf("Unexpected error: got %v,
want %v", ti.err, tt.wantErr)
+ }
+
+ if diff := cmp.Diff(got, tt.want,
+ protocmp.IgnoreUnknown(),
protocmp.Transform()); diff != "" {
+ t.Errorf("Unexpected []pbv1.Result
(-got +want):\n%s", diff)
+ }
+ }
+
+ t.Run("memory snapshot", func(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+ tst := &tsTable{
+ loopCloser: run.NewCloser(2),
+ introductions: make(chan *introduction),
+ fileSystem: fs.NewLocalFileSystem(),
+ root: tmpPath,
+ }
+ tst.gc.init(tst)
+ flushCh := make(chan *flusherIntroduction)
+ mergeCh := make(chan *mergerIntroduction)
+ introducerWatcher := make(watcher.Channel, 1)
+ go tst.introducerLoop(flushCh, mergeCh,
introducerWatcher, 1)
+ for _, traces := range tt.tracesList {
+ tst.mustAddTraces(traces)
+ time.Sleep(100 * time.Millisecond)
+ }
+ verify(t, tst)
+ })
+
+ t.Run("file snapshot", func(t *testing.T) {
+ // Initialize a tstIter object.
+ tmpPath, defFn := test.Space(require.New(t))
+ fileSystem := fs.NewLocalFileSystem()
+ defer defFn()
+ tst, err := newTSTable(fileSystem, tmpPath,
common.Position{},
+ logger.GetLogger("test"),
timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy:
newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil)
+ require.NoError(t, err)
+ for _, traces := range tt.tracesList {
+ tst.mustAddTraces(traces)
+ time.Sleep(100 * time.Millisecond)
+ }
+ // wait until the introducer is done
+ if len(tt.tracesList) > 0 {
+ for {
+ snp := tst.currentSnapshot()
+ if snp == nil {
+ time.Sleep(100 *
time.Millisecond)
+ continue
+ }
+ if snp.creator ==
snapshotCreatorMemPart {
+ snp.decRef()
+ time.Sleep(100 *
time.Millisecond)
+ continue
+ }
+ snp.decRef()
+ tst.Close()
+ break
+ }
+ }
+
+ // reopen the table
+ tst, err = newTSTable(fileSystem, tmpPath,
common.Position{},
+ logger.GetLogger("test"),
timestamp.TimeRange{}, option{
+ flushTimeout:
defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting(),
+ protector: protector.Nop{},
+ }, nil)
+ require.NoError(t, err)
+
+ verify(t, tst)
+ })
+ })
+ }
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 243ebd13..3bbded02 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -67,13 +67,14 @@ type snapshot struct {
ref int32
}
-func (s *snapshot) getParts(dst []*part, minTimestamp, maxTimestamp int64)
([]*part, int) {
+func (s *snapshot) getParts(dst []*part, minTimestamp int64, maxTimestamp
int64) ([]*part, int) {
var count int
for _, p := range s.parts {
pm := p.p.partMetadata
if maxTimestamp < pm.MinTimestamp || minTimestamp >
pm.MaxTimestamp {
continue
}
+ // TODO: filter parts
dst = append(dst, p.p)
count++
}
diff --git a/banyand/trace/tracing.go b/banyand/trace/tracing.go
index 84891858..9c996424 100644
--- a/banyand/trace/tracing.go
+++ b/banyand/trace/tracing.go
@@ -23,11 +23,14 @@ import (
"time"
"github.com/dustin/go-humanize"
+
+ "github.com/apache/skywalking-banyandb/pkg/query"
)
+// TODO: check the header.
const (
partMetadataHeader = "MinTimestamp, MaxTimestamp, CompressionSize,
UncompressedSize, TotalCount, BlocksCount"
- blockHeader = "PartID, SeriesID, MinTimestamp, MaxTimestamp,
Count, UncompressedSize"
+ blockHeader = "PartID, TraceID, Count, UncompressedSize"
)
func (pm *partMetadata) String() string {
@@ -45,6 +48,26 @@ func (bc *blockCursor) String() string {
bc.p.partMetadata.ID, bc.bm.traceID, bc.bm.count,
humanize.Bytes(bc.bm.uncompressedSpanSizeBytes))
}
-func startBlockScanSpan(_ context.Context, _ int, _ []*part) func() {
- panic("unimplemented")
+func startBlockScanSpan(ctx context.Context, tid string, parts []*part, qr
*queryResult) func() {
+ tracer := query.GetTracer(ctx)
+ if tracer == nil {
+ return func() {}
+ }
+
+ span, _ := tracer.StartSpan(ctx, "scan-blocks")
+ span.Tag("trace_id", tid)
+ span.Tag("part_header", partMetadataHeader)
+ span.Tag("part_num", fmt.Sprintf("%d", len(parts)))
+ for i := range parts {
+ span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID,
parts[i].path),
+ parts[i].partMetadata.String())
+ }
+
+ return func() {
+ span.Tag("block_header", blockHeader)
+ for i := range qr.data {
+ span.Tag(fmt.Sprintf("block_%d", i),
qr.data[i].String())
+ }
+ span.Stop()
+ }
}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index cd0b1075..7443831b 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -18,7 +18,6 @@
package trace
import (
- "container/heap"
"encoding/json"
"errors"
"fmt"
@@ -310,11 +309,10 @@ func (tst *tsTable) mustAddTraces(ts *traces) {
}
type tstIter struct {
- err error
- parts []*part
- piPool []partIter
- piHeap partIterHeap
- nextBlockNoop bool
+ err error
+ parts []*part
+ piPool []*partIter
+ idx int
}
func (ti *tstIter) reset() {
@@ -328,55 +326,33 @@ func (ti *tstIter) reset() {
}
ti.piPool = ti.piPool[:0]
- for i := range ti.piHeap {
- ti.piHeap[i] = nil
- }
- ti.piHeap = ti.piHeap[:0]
-
ti.err = nil
- ti.nextBlockNoop = false
+ ti.idx = 0
}
-func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, tids []string,
minTimestamp, maxTimestamp int64) {
+func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, tid string) {
ti.reset()
ti.parts = parts
if n := len(ti.parts) - cap(ti.piPool); n > 0 {
- ti.piPool = append(ti.piPool[:cap(ti.piPool)], make([]partIter,
n)...)
+ ti.piPool = append(ti.piPool[:cap(ti.piPool)],
make([]*partIter, n)...)
}
ti.piPool = ti.piPool[:len(ti.parts)]
for i, p := range ti.parts {
- ti.piPool[i].init(bma, p, tids, minTimestamp, maxTimestamp)
+ ti.piPool[i] = &partIter{}
+ ti.piPool[i].init(bma, p, tid)
}
- ti.piHeap = ti.piHeap[:0]
- for i := range ti.piPool {
- ps := &ti.piPool[i]
- if !ps.nextBlock() {
- if err := ps.error(); err != nil {
- ti.err = fmt.Errorf("cannot initialize tsTable
iteration: %w", err)
- return
- }
- continue
- }
- ti.piHeap = append(ti.piHeap, ps)
- }
- if len(ti.piHeap) == 0 {
+ if len(ti.piPool) == 0 {
ti.err = io.EOF
return
}
- heap.Init(&ti.piHeap)
- ti.nextBlockNoop = true
}
func (ti *tstIter) nextBlock() bool {
if ti.err != nil {
return false
}
- if ti.nextBlockNoop {
- ti.nextBlockNoop = false
- return true
- }
ti.err = ti.next()
if ti.err != nil {
@@ -389,22 +365,17 @@ func (ti *tstIter) nextBlock() bool {
}
func (ti *tstIter) next() error {
- psMin := ti.piHeap[0]
- if psMin.nextBlock() {
- heap.Fix(&ti.piHeap, 0)
- return nil
- }
-
- if err := psMin.error(); err != nil {
- return err
- }
-
- heap.Pop(&ti.piHeap)
-
- if len(ti.piHeap) == 0 {
- return io.EOF
+ for ti.idx < len(ti.piPool) {
+ pi := ti.piPool[ti.idx]
+ if pi.nextBlock() {
+ return nil
+ }
+ if err := pi.error(); err != nil {
+ return err
+ }
+ ti.idx++
}
- return nil
+ return io.EOF
}
func (ti *tstIter) Error() error {
@@ -428,30 +399,3 @@ func releaseTstIter(ti *tstIter) {
}
var tstIterPool = pool.Register[*tstIter]("trace-tstIter")
-
-type partIterHeap []*partIter
-
-func (pih *partIterHeap) Len() int {
- return len(*pih)
-}
-
-func (pih *partIterHeap) Less(i, j int) bool {
- x := *pih
- return x[i].curBlock.less(x[j].curBlock)
-}
-
-func (pih *partIterHeap) Swap(i, j int) {
- x := *pih
- x[i], x[j] = x[j], x[i]
-}
-
-func (pih *partIterHeap) Push(x any) {
- *pih = append(*pih, x.(*partIter))
-}
-
-func (pih *partIterHeap) Pop() any {
- a := *pih
- v := a[len(a)-1]
- *pih = a[:len(a)-1]
- return v
-}
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 169b101f..2ee0c4db 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -115,10 +116,10 @@ func Test_tsTable_mustAddTraces(t *testing.T) {
func Test_tstIter(t *testing.T) {
type testCtx struct {
+ tsList []*traces
wantErr error
name string
- tsList []*traces
- tids []string
+ tid string
want []blockMetadata
minTimestamp int64
maxTimestamp int64
@@ -136,14 +137,14 @@ func Test_tstIter(t *testing.T) {
pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp)
require.Equal(t, len(s.parts), n)
ti := &tstIter{}
- ti.init(bma, pp, tt.tids, tt.minTimestamp, tt.maxTimestamp)
+ ti.init(bma, pp, tt.tid)
var got []blockMetadata
for ti.nextBlock() {
- if ti.piHeap[0].curBlock.traceID == "" {
+ if ti.piPool[ti.idx].curBlock.traceID == "" {
t.Errorf("Expected curBlock to be initialized,
but it was nil")
}
var bm blockMetadata
- bm.copyFrom(ti.piHeap[0].curBlock)
+ bm.copyFrom(ti.piPool[ti.idx].curBlock)
got = append(got, bm)
}
@@ -166,37 +167,28 @@ func Test_tstIter(t *testing.T) {
t.Run("memory snapshot", func(t *testing.T) {
tests := []testCtx{
{
- name: "Test with no traces",
- tsList: []*traces{},
- tids: []string{"trace1", "trace2",
"trace3"},
- minTimestamp: 1,
- maxTimestamp: 1,
+ name: "Test with no traces",
+ tsList: []*traces{},
},
{
name: "Test with single part",
tsList: []*traces{tsTS1},
- tids: []string{"trace1", "trace2",
"trace3"},
+ tid: "trace1",
minTimestamp: 1,
- maxTimestamp: 1,
+ maxTimestamp: 2,
want: []blockMetadata{
{traceID: "trace1", count: 1,
uncompressedSpanSizeBytes: 5},
- {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
- {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
},
},
{
name: "Test with multiple parts",
tsList: []*traces{tsTS1, tsTS2},
- tids: []string{"trace1", "trace2",
"trace3"},
+ tid: "trace1",
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{traceID: "trace1", count: 1,
uncompressedSpanSizeBytes: 5},
{traceID: "trace1", count: 1,
uncompressedSpanSizeBytes: 5},
- {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
- {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
- {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
- {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
},
},
}
@@ -225,6 +217,10 @@ func Test_tstIter(t *testing.T) {
})
}
+var allTagProjections = &model.TagProjection{
+ Names: []string{"strArrTag", "strTag", "intTag"},
+}
+
var tsTS1 = &traces{
traceIDs: []string{"trace1", "trace2", "trace3"},
timestamps: []int64{1, 1, 1},
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index e0107d86..f3349d06 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -348,7 +348,13 @@ func (t *TraceQueryOptions) CopyFrom(other
*TraceQueryOptions) {
// TraceResult is the result of a query.
type TraceResult struct {
Error error
- Tags []Tag
Spans [][]byte
- TIDs []string
+ TID string
+ Tags []Tag
+}
+
+// TraceQueryResult is the result of a trace query.
+type TraceQueryResult interface {
+ Pull() *TraceResult
+ Release()
}