This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ec127ed6 Fix the bug that sidx doesn't support array value tags (#793)
ec127ed6 is described below
commit ec127ed6a66ccf52a97d3c9f1568f2d2e18dbedd
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Sep 27 11:48:30 2025 +0800
Fix the bug that sidx doesn't support array value tags (#793)
---
.golangci.yml | 9 --
banyand/internal/encoding/tag_encoder.go | 27 +++++
banyand/internal/encoding/tag_encoder_test.go | 46 +++++++++
banyand/internal/sidx/block.go | 20 +++-
banyand/internal/sidx/block_test.go | 48 +++++----
banyand/internal/sidx/element.go | 42 ++++++--
banyand/internal/sidx/interfaces.go | 48 ++++-----
banyand/internal/sidx/multi_sidx_query_test.go | 1 -
banyand/internal/sidx/part.go | 37 -------
banyand/internal/sidx/sidx_test.go | 52 +++++++++-
banyand/internal/sidx/snapshot.go | 27 -----
banyand/internal/sidx/tag.go | 26 -----
banyand/internal/sidx/tag_test.go | 6 +-
banyand/trace/block.go | 4 -
banyand/trace/bloom_filter.go | 16 ---
banyand/trace/metrics.go | 5 +-
banyand/trace/snapshot.go | 84 ---------------
banyand/trace/syncer.go | 9 +-
banyand/trace/tag.go | 3 +-
banyand/trace/trace_suite_test.go | 113 ---------------------
banyand/trace/traces.go | 58 +----------
banyand/trace/write_data.go | 14 +--
banyand/trace/write_standalone.go | 18 +++-
.../trace/data/input/having_query_tag_cond.yml | 30 ++++++
test/cases/trace/trace.go | 1 +
25 files changed, 289 insertions(+), 455 deletions(-)
diff --git a/.golangci.yml b/.golangci.yml
index eadf4197..da1c435a 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -152,13 +152,4 @@ issues:
- linters:
- staticcheck
text: "SA1019: package github.com/golang/protobuf"
- # TODO: remove this after the trace is done
- - path: "^trace/"
- linters:
- - unused
- - unparam
- # TODO: remove this after the sidx is done
- - path: "internal/sidx/"
- linters:
- - unused
max-same-issues: 0
diff --git a/banyand/internal/encoding/tag_encoder.go
b/banyand/internal/encoding/tag_encoder.go
index 78bcf0bf..4800572d 100644
--- a/banyand/internal/encoding/tag_encoder.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -21,6 +21,8 @@
package encoding
import (
+ stdbytes "bytes"
+
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -29,6 +31,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/pool"
)
+const (
+ // EntityDelimiter is the delimiter for entities in a variable-length
array.
+ EntityDelimiter = '|'
+ // Escape is the escape character for entities in a variable-length
array.
+ Escape = '\\'
+)
+
var (
int64SlicePool = pool.Register[*[]int64]("tag-encoder-int64Slice")
float64SlicePool = pool.Register[*[]float64]("tag-encoder-float64Slice")
@@ -86,6 +95,24 @@ func releaseDictionary(d *encoding.Dictionary) {
dictionaryPool.Put(d)
}
+// MarshalVarArray marshals a byte slice into a variable-length array format.
+// It escapes delimiter and escape characters within the source slice.
+func MarshalVarArray(dest, src []byte) []byte {
+ if stdbytes.IndexByte(src, EntityDelimiter) < 0 &&
stdbytes.IndexByte(src, Escape) < 0 {
+ dest = append(dest, src...)
+ dest = append(dest, EntityDelimiter)
+ return dest
+ }
+ for _, b := range src {
+ if b == EntityDelimiter || b == Escape {
+ dest = append(dest, Escape)
+ }
+ dest = append(dest, b)
+ }
+ dest = append(dest, EntityDelimiter)
+ return dest
+}
+
// EncodeTagValues encodes tag values based on the value type with optimal
compression.
// For int64: uses delta encoding with first value storage.
// For float64: converts to decimal integers with exponent, then delta
encoding.
diff --git a/banyand/internal/encoding/tag_encoder_test.go
b/banyand/internal/encoding/tag_encoder_test.go
index 52e736ed..fc6aa069 100644
--- a/banyand/internal/encoding/tag_encoder_test.go
+++ b/banyand/internal/encoding/tag_encoder_test.go
@@ -198,3 +198,49 @@ func TestEncodeDecodeTagValues_Int64_EmptyInput(t
*testing.T) {
require.NoError(t, err)
assert.Nil(t, decoded)
}
+
+func TestMarshalVarArray(t *testing.T) {
+ tests := []struct {
+ name string
+ input []byte
+ expected []byte
+ }{
+ {
+ name: "empty",
+ input: []byte{},
+ expected: []byte{'|'},
+ },
+ {
+ name: "no special chars",
+ input: []byte("abc"),
+ expected: []byte("abc|"),
+ },
+ {
+ name: "with delimiter",
+ input: []byte("a|b"),
+ expected: []byte("a\\|b|"),
+ },
+ {
+ name: "with escape",
+ input: []byte("a\\b"),
+ expected: []byte("a\\\\b|"),
+ },
+ {
+ name: "with delimiter and escape",
+ input: []byte("a|\\b"),
+ expected: []byte("a\\|\\\\b|"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert.Equal(t, tt.expected, MarshalVarArray(nil,
tt.input))
+ })
+ }
+
+ t.Run("multiple values", func(t *testing.T) {
+ var result []byte
+ result = MarshalVarArray(result, []byte("a|b"))
+ result = MarshalVarArray(result, []byte("c\\d"))
+ assert.Equal(t, []byte("a\\|b|c\\\\d|"), result)
+ })
+}
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index 524fdb78..55dd22f3 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -112,13 +112,12 @@ func (b *block) processTag(tagName string, elementTags
[][]*tag) {
td.values = make([][]byte, len(b.userKeys))
var valueType pbv1.ValueType
-
// Collect values for this tag across all elements
for i, tags := range elementTags {
found := false
for _, tag := range tags {
if tag.name == tagName {
- td.values[i] = tag.value
+ td.values[i] = tag.marshal()
valueType = tag.valueType
found = true
break
@@ -133,9 +132,20 @@ func (b *block) processTag(tagName string, elementTags
[][]*tag) {
// Create bloom filter for indexed tags
td.filter = generateBloomFilter(len(b.userKeys))
- for _, value := range td.values {
- if value != nil {
- td.filter.Add(value)
+ for _, tags := range elementTags {
+ for _, tag := range tags {
+ if tag.name == tagName {
+ if tag.valueArr != nil {
+ for _, v := range tag.valueArr {
+ if v != nil {
+ td.filter.Add(v)
+ }
+ }
+ } else if tag.value != nil {
+ td.filter.Add(tag.value)
+ }
+ break
+ }
}
}
diff --git a/banyand/internal/sidx/block_test.go
b/banyand/internal/sidx/block_test.go
index b3ed92c9..64627ef4 100644
--- a/banyand/internal/sidx/block_test.go
+++ b/banyand/internal/sidx/block_test.go
@@ -19,6 +19,10 @@ package sidx
import (
"testing"
+
+ "github.com/stretchr/testify/assert"
+
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
func TestBlock_BasicOperations(t *testing.T) {
@@ -127,26 +131,36 @@ func TestBlock_KeyRange(t *testing.T) {
}
}
-func TestBlock_MemoryManagement(t *testing.T) {
+func TestBlock_ProcessTag_WithArrValues(t *testing.T) {
b := generateBlock()
defer releaseBlock(b)
- // Add some normal-sized data
- b.data = append(b.data, make([]byte, 100), make([]byte, 200))
-
- // Add an oversized slice (larger than maxPooledSliceSize)
- oversizedData := make([]byte, maxPooledSliceSize+1)
- b.data = append(b.data, oversizedData)
-
- // Reset should handle both normal and oversized slices correctly
- b.reset()
-
- // After reset, data slice should be empty but not nil (since the outer
slice is within limits)
- if b.data == nil {
- t.Error("Data slice should not be nil after reset when within
count limits")
+ b.userKeys = []int64{100, 101}
+ elementTags := [][]*tag{
+ {
+ {
+ name: "arr_tag",
+ valueArr: [][]byte{
+ []byte("a"),
+ []byte("b"),
+ },
+ valueType: pbv1.ValueTypeStrArr,
+ },
+ },
+ {
+ {
+ name: "arr_tag",
+ value: []byte("c"),
+ valueType: pbv1.ValueTypeStr,
+ },
+ },
}
- if len(b.data) != 0 {
- t.Errorf("Data slice should be empty after reset, got length
%d", len(b.data))
- }
+ b.processTag("arr_tag", elementTags)
+
+ assert.Equal(t, "a|b|", string(b.tags["arr_tag"].values[0]))
+ assert.Equal(t, "c", string(b.tags["arr_tag"].values[1]))
+ assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("a")))
+ assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("b")))
+ assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("c")))
}
diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go
index 724df953..d9987e7e 100644
--- a/banyand/internal/sidx/element.go
+++ b/banyand/internal/sidx/element.go
@@ -22,18 +22,16 @@ package sidx
import (
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
-const (
- maxPooledSliceSize = 1024 * 1024 // 1MB
-)
-
// tag represents an individual tag (not tag family like stream).
type tag struct {
name string
value []byte
+ valueArr [][]byte
valueType pbv1.ValueType
}
@@ -49,9 +47,26 @@ type elements struct {
func (t *tag) reset() {
t.name = ""
t.value = nil
+ t.valueArr = nil
t.valueType = pbv1.ValueTypeUnknown
}
+// marshal marshals the tag value to a byte slice.
+func (t *tag) marshal() []byte {
+ if t.valueArr != nil {
+ var dst []byte
+ for i := range t.valueArr {
+ if t.valueType == pbv1.ValueTypeInt64Arr {
+ dst = append(dst, t.valueArr[i]...)
+ continue
+ }
+ dst = encoding.MarshalVarArray(dst, t.valueArr[i])
+ }
+ return dst
+ }
+ return t.value
+}
+
// reset elements collection for pooling.
func (e *elements) reset() {
e.seriesIDs = e.seriesIDs[:0]
@@ -73,7 +88,15 @@ func (e *elements) reset() {
// size returns the size of the tag in bytes.
func (t *tag) size() int {
- return len(t.name) + len(t.value) + 1 // +1 for valueType
+ size := len(t.name) + 1 // +1 for valueType
+ if t.valueArr != nil {
+ for _, v := range t.valueArr {
+ size += len(v)
+ }
+ } else {
+ size += len(t.value)
+ }
+ return size
}
// size returns the total size of all elements.
@@ -166,7 +189,14 @@ func (e *elements) mustAppend(seriesID common.SeriesID,
userKey int64, data []by
for _, t := range tags {
newTag := generateTag()
newTag.name = t.Name
- newTag.value = append([]byte(nil), t.Value...)
+ if t.ValueArr != nil {
+ newTag.valueArr = make([][]byte, len(t.ValueArr))
+ for i, v := range t.ValueArr {
+ newTag.valueArr[i] =
append(newTag.valueArr[i][:0], v...)
+ }
+ } else {
+ newTag.value = append(newTag.value[:0], t.Value...)
+ }
newTag.valueType = t.ValueType
elementTags = append(elementTags, newTag)
}
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index 94cf6d29..8b61c802 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -342,28 +342,29 @@ func (rm *ResponseMetadata) Validate() error {
type Tag struct {
Name string
Value []byte
+ ValueArr [][]byte
ValueType pbv1.ValueType
}
-// NewTag creates a new Tag instance with the given values.
-func NewTag(name string, value []byte, valueType pbv1.ValueType) Tag {
- return Tag{
- Name: name,
- Value: value,
- ValueType: valueType,
- }
-}
-
// Reset resets the Tag to its zero state for reuse.
func (t *Tag) Reset() {
t.Name = ""
t.Value = nil
+ t.ValueArr = nil
t.ValueType = pbv1.ValueTypeUnknown
}
// Size returns the size of the tag in bytes.
func (t *Tag) Size() int {
- return len(t.Name) + len(t.Value) + 1 // +1 for valueType
+ size := len(t.Name) + 1 // +1 for valueType
+ if t.ValueArr != nil {
+ for _, v := range t.ValueArr {
+ size += len(v)
+ }
+ } else {
+ size += len(t.Value)
+ }
+ return size
}
// Copy creates a deep copy of the Tag.
@@ -373,31 +374,22 @@ func (t *Tag) Copy() Tag {
valueCopy = make([]byte, len(t.Value))
copy(valueCopy, t.Value)
}
+ var valueArrCopy [][]byte
+ if t.ValueArr != nil {
+ valueArrCopy = make([][]byte, len(t.ValueArr))
+ for i, v := range t.ValueArr {
+ valueArrCopy[i] = make([]byte, len(v))
+ copy(valueArrCopy[i], v)
+ }
+ }
return Tag{
Name: t.Name,
Value: valueCopy,
+ ValueArr: valueArrCopy,
ValueType: t.ValueType,
}
}
-// toInternalTag converts the exported Tag to an internal tag for use with the
pooling system.
-func (t *Tag) toInternalTag() *tag {
- return &tag{
- name: t.Name,
- value: t.Value,
- valueType: t.ValueType,
- }
-}
-
-// fromInternalTag creates a Tag from an internal tag.
-func fromInternalTag(t *tag) Tag {
- return Tag{
- Name: t.name,
- Value: t.value,
- ValueType: t.valueType,
- }
-}
-
// Validate validates a WriteRequest for correctness.
func (wr WriteRequest) Validate() error {
if wr.SeriesID == 0 {
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go
b/banyand/internal/sidx/multi_sidx_query_test.go
index b15553b7..155c06bf 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -35,7 +35,6 @@ type mockSIDX struct {
err error
response *QueryResponse
name string
- delay bool
}
func (m *mockSIDX) MustAddMemPart(_ context.Context, _ *memPart) {}
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 94c6b7f7..5b02efbb 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -458,11 +458,6 @@ func (p *part) String() string {
return fmt.Sprintf("sidx part at %s", p.path)
}
-// getPartMetadata returns the part metadata.
-func (p *part) getPartMetadata() *partMetadata {
- return p.partMetadata
-}
-
// getTagDataReader returns the tag data reader for the specified tag name.
func (p *part) getTagDataReader(tagName string) (fs.Reader, bool) {
reader, exists := p.tagData[tagName]
@@ -481,38 +476,6 @@ func (p *part) getTagFilterReader(tagName string)
(fs.Reader, bool) {
return reader, exists
}
-// getAvailableTagNames returns a slice of all available tag names in this
part.
-func (p *part) getAvailableTagNames() []string {
- tagNames := make(map[string]struct{})
-
- // Collect tag names from all tag file types.
- for tagName := range p.tagData {
- tagNames[tagName] = struct{}{}
- }
- for tagName := range p.tagMetadata {
- tagNames[tagName] = struct{}{}
- }
- for tagName := range p.tagFilters {
- tagNames[tagName] = struct{}{}
- }
-
- // Convert to slice.
- result := make([]string, 0, len(tagNames))
- for tagName := range tagNames {
- result = append(result, tagName)
- }
-
- return result
-}
-
-// hasTagFiles returns true if the part has any tag files for the specified
tag name.
-func (p *part) hasTagFiles(tagName string) bool {
- _, hasData := p.tagData[tagName]
- _, hasMeta := p.tagMetadata[tagName]
- _, hasFilter := p.tagFilters[tagName]
- return hasData || hasMeta || hasFilter
-}
-
// Path returns the part's directory path.
func (p *part) Path() string {
return p.path
diff --git a/banyand/internal/sidx/sidx_test.go
b/banyand/internal/sidx/sidx_test.go
index 95aef2b0..1025cad1 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -35,10 +35,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
)
-const partIDForTesting = 1
-
// Test helper functions.
func waitForIntroducerLoop() {
@@ -403,6 +402,55 @@ func TestSIDX_Query_Ordering(t *testing.T) {
}
}
+func TestSIDX_Query_WithArrValues(t *testing.T) {
+ sidx := createTestSIDX(t)
+ defer func() {
+ assert.NoError(t, sidx.Close())
+ }()
+
+ ctx := context.Background()
+
+ // Write test data with different keys
+ reqs := []WriteRequest{
+ createTestWriteRequest(1, 100, "data100", Tag{
+ Name: "arr_tag",
+ ValueArr: [][]byte{
+ []byte("a"),
+ []byte("b"),
+ },
+ ValueType: pbv1.ValueTypeStrArr,
+ }),
+ createTestWriteRequest(1, 150, "data150"),
+ createTestWriteRequest(1, 200, "data200"),
+ }
+ err := sidx.Write(ctx, reqs, 1)
+ require.NoError(t, err)
+
+ // Wait for introducer loop to process
+ waitForIntroducerLoop()
+
+ queryReq := QueryRequest{
+ SeriesIDs: []common.SeriesID{1},
+ TagProjection: []model.TagProjection{
+ {
+ Names: []string{"arr_tag"},
+ },
+ },
+ }
+
+ response, err := sidx.Query(ctx, queryReq)
+ require.NoError(t, err)
+ require.NotNil(t, response)
+
+ assert.Equal(t, 3, response.Len())
+ for i := 0; i < response.Len(); i++ {
+ if response.Keys[i] == 100 {
+ assert.Equal(t, "arr_tag", response.Tags[i][0].Name)
+ assert.Equal(t, "a|b|",
string(response.Tags[i][0].Value))
+ }
+ }
+}
+
func TestSIDX_Query_Validation(t *testing.T) {
sidx := createTestSIDX(t)
defer func() {
diff --git a/banyand/internal/sidx/snapshot.go
b/banyand/internal/sidx/snapshot.go
index 809c0763..c16d9e30 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"path/filepath"
- "sort"
"strconv"
"sync/atomic"
@@ -186,32 +185,6 @@ func (s *snapshot) validate() error {
return nil
}
-// sortPartsByEpoch sorts parts by their epoch (ID), oldest first.
-// This ensures consistent iteration order during queries.
-func (s *snapshot) sortPartsByEpoch() {
- sort.Slice(s.parts, func(i, j int) bool {
- partI := s.parts[i].p
- partJ := s.parts[j].p
-
- if partI == nil || partI.partMetadata == nil {
- return false
- }
- if partJ == nil || partJ.partMetadata == nil {
- return true
- }
-
- return partI.partMetadata.ID < partJ.partMetadata.ID
- })
-}
-
-// copyParts creates a copy of the parts slice for safe iteration.
-// The caller should acquire references to parts they intend to use.
-func (s *snapshot) copyParts() []*partWrapper {
- result := make([]*partWrapper, len(s.parts))
- copy(result, s.parts)
- return result
-}
-
// addPart adds a new part to the snapshot during construction.
// This should only be called before the snapshot is made available to other
goroutines.
// After construction, snapshots should be treated as immutable.
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index 2db4ec53..d02ebf24 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -18,7 +18,6 @@
package sidx
import (
- "bytes"
"fmt"
pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -217,31 +216,6 @@ func (td *tagData) updateMinMax() {
}
}
-// addValue adds a value to the tag data.
-func (td *tagData) addValue(value []byte) {
- td.values = append(td.values, value)
-
- // Update filter for indexed tags
- if td.filter != nil {
- td.filter.Add(value)
- }
-}
-
-// hasValue checks if a value exists in the tag using the bloom filter.
-func (td *tagData) hasValue(value []byte) bool {
- if td.filter == nil {
- // For non-indexed tags, do linear search
- for _, v := range td.values {
- if bytes.Equal(v, value) {
- return true
- }
- }
- return false
- }
-
- return td.filter.MightContain(value)
-}
-
// marshal serializes tag metadata to bytes using encoding package.
func (tm *tagMetadata) marshal(dst []byte) []byte {
dst = pkgencoding.EncodeBytes(dst, []byte(tm.name))
diff --git a/banyand/internal/sidx/tag_test.go
b/banyand/internal/sidx/tag_test.go
index 19ef70cf..2f96ff2e 100644
--- a/banyand/internal/sidx/tag_test.go
+++ b/banyand/internal/sidx/tag_test.go
@@ -41,7 +41,11 @@ func TestTagExportedFields(t *testing.T) {
func TestNewTag(t *testing.T) {
// Test the NewTag constructor function
- tag := NewTag("service", []byte("order-service"), pbv1.ValueTypeStr)
+ tag := Tag{
+ Name: "service",
+ Value: []byte("order-service"),
+ ValueType: pbv1.ValueTypeStr,
+ }
assert.Equal(t, "service", tag.Name)
assert.Equal(t, []byte("order-service"), tag.Value)
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index ca6c4073..9ba12bee 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -580,10 +580,6 @@ func assertIdxAndOffset(name string, length int, idx int,
offset int) {
}
}
-func (bi *blockPointer) isFull() bool {
- return bi.bm.uncompressedSpanSizeBytes >= maxUncompressedSpanSize
-}
-
func (bi *blockPointer) reset() {
bi.idx = 0
bi.block.reset()
diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go
index 6e8efe5f..67e182b7 100644
--- a/banyand/trace/bloom_filter.go
+++ b/banyand/trace/bloom_filter.go
@@ -21,7 +21,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/filter"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/pool"
)
func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
@@ -44,18 +43,3 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter)
*filter.BloomFilter {
return bf
}
-
-func generateBloomFilter() *filter.BloomFilter {
- v := bloomFilterPool.Get()
- if v == nil {
- return filter.NewBloomFilter(0)
- }
- return v
-}
-
-func releaseBloomFilter(bf *filter.BloomFilter) {
- bf.Reset()
- bloomFilterPool.Put(bf)
-}
-
-var bloomFilterPool = pool.Register[*filter.BloomFilter]("trace-bloomFilter")
diff --git a/banyand/trace/metrics.go b/banyand/trace/metrics.go
index 2402e308..e71514f0 100644
--- a/banyand/trace/metrics.go
+++ b/banyand/trace/metrics.go
@@ -26,9 +26,8 @@ import (
)
var (
- streamScope = observability.RootScope.SubScope("stream")
- tbScope = streamScope.SubScope("tst")
- storageScope = streamScope.SubScope("storage")
+ streamScope = observability.RootScope.SubScope("stream")
+ tbScope = streamScope.SubScope("tst")
)
type metrics struct {
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 935b8d3d..bc0ee241 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -18,24 +18,16 @@
package trace
import (
- "context"
"encoding/json"
"fmt"
"path/filepath"
"sort" // added for sorting parts
- "sync"
"sync/atomic"
- "time"
"github.com/pkg/errors"
- "go.uber.org/multierr"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/schema"
)
func (tst *tsTable) currentSnapshot() *snapshot {
@@ -242,79 +234,3 @@ func (tst *tsTable) createMetadata(dst string, snapshot
*snapshot) {
logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", snapshotPath, n, len(data))
}
}
-
-func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) error {
- group, ok := s.schemaRepo.LoadGroup(groupName)
- if !ok {
- return errors.Errorf("group %s not found", groupName)
- }
- db := group.SupplyTSDB()
- if db == nil {
- return errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
- }
- tsdb := db.(storage.TSDB[*tsTable, option])
- if err := tsdb.TakeFileSnapshot(dstDir); err != nil {
- return errors.WithMessagef(err, "snapshot %s fail to take file
snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
- }
- return nil
-}
-
-type snapshotListener struct {
- *bus.UnImplementedHealthyListener
- s *standalone
- snapshotSeq uint64
- snapshotMux sync.Mutex
-}
-
-// Rev takes a snapshot of the database.
-func (s *snapshotListener) Rev(ctx context.Context, message bus.Message)
bus.Message {
- groups := message.Data().([]*databasev1.SnapshotRequest_Group)
- var gg []schema.Group
- if len(groups) == 0 {
- gg = s.s.schemaRepo.LoadAllGroups()
- } else {
- for _, g := range groups {
- if g.Catalog != commonv1.Catalog_CATALOG_TRACE {
- continue
- }
- group, ok := s.s.schemaRepo.LoadGroup(g.Group)
- if !ok {
- continue
- }
- gg = append(gg, group)
- }
- }
- if len(gg) == 0 {
- return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
- }
- s.snapshotMux.Lock()
- defer s.snapshotMux.Unlock()
- storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.lfs)
- sn := s.snapshotName()
- var err error
- for _, g := range gg {
- select {
- case <-ctx.Done():
- return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
- default:
- }
- if errGroup :=
s.s.takeGroupSnapshot(filepath.Join(s.s.snapshotDir, sn,
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
- s.s.l.Error().Err(errGroup).Str("group",
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
- err = multierr.Append(err, errGroup)
- continue
- }
- }
- snp := &databasev1.Snapshot{
- Name: sn,
- Catalog: commonv1.Catalog_CATALOG_TRACE,
- }
- if err != nil {
- snp.Error = err.Error()
- }
- return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), snp)
-}
-
-func (s *snapshotListener) snapshotName() string {
- s.snapshotSeq++
- return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
-}
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index ba95c470..8e98a3e0 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -294,10 +294,7 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync
[]*part, sidxPartsToSync
}
// Create sidx sync introductions
- sidxSyncIntroductions, err :=
tst.createSidxSyncIntroductions(sidxPartsToSync)
- if err != nil {
- return err
- }
+ sidxSyncIntroductions :=
tst.createSidxSyncIntroductions(sidxPartsToSync)
defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
// Send sync introductions
@@ -310,7 +307,7 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync
[]*part, sidxPartsToSync
}
// createSidxSyncIntroductions creates sync introductions for sidx parts.
-func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync
map[string][]*sidx.Part) (map[string]*sidx.SyncIntroduction, error) {
+func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync
map[string][]*sidx.Part) map[string]*sidx.SyncIntroduction {
sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
for name, sidxParts := range sidxPartsToSync {
if len(sidxParts) > 0 {
@@ -322,7 +319,7 @@ func (tst *tsTable)
createSidxSyncIntroductions(sidxPartsToSync map[string][]*si
sidxSyncIntroductions[name] = ssi
}
}
- return sidxSyncIntroductions, nil
+ return sidxSyncIntroductions
}
// releaseSidxSyncIntroductions releases sidx sync introductions.
diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go
index bd576ece..c8c0daaf 100644
--- a/banyand/trace/tag.go
+++ b/banyand/trace/tag.go
@@ -42,14 +42,13 @@ func (t *tag) reset() {
t.values = values[:0]
}
-func (t *tag) resizeValues(valuesLen int) [][]byte {
+func (t *tag) resizeValues(valuesLen int) {
values := t.values
if n := valuesLen - cap(values); n > 0 {
values = append(values[:cap(values)], make([][]byte, n)...)
}
values = values[:valuesLen]
t.values = values
- return values
}
func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) {
diff --git a/banyand/trace/trace_suite_test.go
b/banyand/trace/trace_suite_test.go
deleted file mode 100644
index 05140094..00000000
--- a/banyand/trace/trace_suite_test.go
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package trace_test
-
-import (
- "context"
- "testing"
-
- g "github.com/onsi/ginkgo/v2"
- "github.com/onsi/gomega"
-
- "github.com/apache/skywalking-banyandb/banyand/metadata"
- "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver"
- "github.com/apache/skywalking-banyandb/banyand/observability"
- "github.com/apache/skywalking-banyandb/banyand/protector"
- "github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/banyand/trace"
- "github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/test"
- "github.com/apache/skywalking-banyandb/pkg/test/flags"
- testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
-)
-
-func TestTrace(t *testing.T) {
- gomega.RegisterFailHandler(g.Fail)
- g.RunSpecs(t, "Trace Suite")
-}
-
-var _ = g.BeforeSuite(func() {
- gomega.Expect(logger.Init(logger.Logging{
- Env: "dev",
- Level: flags.LogLevel,
- })).To(gomega.Succeed())
-})
-
-type preloadTraceService struct {
- metaSvc metadata.Service
-}
-
-func (p *preloadTraceService) Name() string {
- return "preload-trace"
-}
-
-func (p *preloadTraceService) PreRun(ctx context.Context) error {
- return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
-}
-
-type services struct {
- trace trace.Service
- metadataService metadata.Service
- pipeline queue.Queue
-}
-
-func setUp() (*services, func()) {
- // Init Pipeline
- pipeline := queue.Local()
-
- // Init Metadata Service
- metadataService, err := embeddedserver.NewService(context.TODO())
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
-
- metricSvc := observability.NewMetricService(metadataService, pipeline,
"test", nil)
- pm := protector.NewMemory(metricSvc)
- // Init Trace Service
- traceService, err := trace.NewService(metadataService, pipeline,
metricSvc, pm)
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- preloadTraceSvc := &preloadTraceService{metaSvc: metadataService}
- // querySvc, err := query.NewService(context.TODO(), traceService, nil,
metadataService, pipeline)
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- var flags []string
- metaPath, metaDeferFunc, err := test.NewSpace()
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- flags = append(flags, "--metadata-root-path="+metaPath)
- rootPath, deferFunc, err := test.NewSpace()
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- flags = append(flags, "--trace-root-path="+rootPath)
- listenClientURL, listenPeerURL, err := test.NewEtcdListenUrls()
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
- flags = append(flags, "--etcd-listen-client-url="+listenClientURL,
"--etcd-listen-peer-url="+listenPeerURL)
- moduleDeferFunc := test.SetupModules(
- flags,
- pipeline,
- metadataService,
- preloadTraceSvc,
- traceService,
- // querySvc,
- )
- return &services{
- trace: traceService,
- metadataService: metadataService,
- pipeline: pipeline,
- }, func() {
- moduleDeferFunc()
- metaDeferFunc()
- deferFunc()
- }
-}
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index bf92d649..b8129008 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -18,11 +18,10 @@
package trace
import (
- "bytes"
-
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/banyand/internal/sidx"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/internal/wqueue"
@@ -45,19 +44,6 @@ func (t *tagValue) reset() {
t.valueArr = nil
}
-func (t *tagValue) size() int {
- s := len(t.tag)
- if t.value != nil {
- s += len(t.value)
- }
- if t.valueArr != nil {
- for i := range t.valueArr {
- s += len(t.valueArr[i])
- }
- }
- return s
-}
-
func (t *tagValue) marshal() []byte {
if t.valueArr != nil {
var dst []byte
@@ -66,7 +52,7 @@ func (t *tagValue) marshal() []byte {
dst = append(dst, t.valueArr[i]...)
continue
}
- dst = marshalVarArray(dst, t.valueArr[i])
+ dst = encoding.MarshalVarArray(dst, t.valueArr[i])
}
return dst
}
@@ -88,43 +74,22 @@ func releaseTagValue(v *tagValue) {
var tagValuePool = pool.Register[*tagValue]("trace-tagValue")
-const (
- entityDelimiter = '|'
- escape = '\\'
-)
-
-func marshalVarArray(dest, src []byte) []byte {
- if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src,
escape) < 0 {
- dest = append(dest, src...)
- dest = append(dest, entityDelimiter)
- return dest
- }
- for _, b := range src {
- if b == entityDelimiter || b == escape {
- dest = append(dest, escape)
- }
- dest = append(dest, b)
- }
- dest = append(dest, entityDelimiter)
- return dest
-}
-
func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
if len(src) == 0 {
return nil, nil, errors.New("empty entity value")
}
- if src[0] == entityDelimiter {
+ if src[0] == encoding.EntityDelimiter {
return dest, src[1:], nil
}
for len(src) > 0 {
switch {
- case src[0] == escape:
+ case src[0] == encoding.Escape:
if len(src) < 2 {
return nil, nil, errors.New("invalid escape
character")
}
src = src[1:]
dest = append(dest, src[0])
- case src[0] == entityDelimiter:
+ case src[0] == encoding.EntityDelimiter:
return dest, src[1:], nil
default:
dest = append(dest, src[0])
@@ -134,19 +99,6 @@ func unmarshalVarArray(dest, src []byte) ([]byte, []byte,
error) {
return nil, nil, errors.New("invalid variable array")
}
-type tagValues struct {
- tag string
- values []*tagValue
-}
-
-func (t *tagValues) reset() {
- t.tag = ""
- for i := range t.values {
- releaseTagValue(t.values[i])
- }
- t.values = t.values[:0]
-}
-
type traces struct {
traceIDs []string
timestamps []int64
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 9873b4d8..ed3c74dc 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -212,15 +212,11 @@ func (s *syncCallback) handleTraceFileChunk(ctx
*queue.ChunkedSyncPartContext, c
partCtx.writers.spanWriter.MustWrite(chunk)
case fileName == traceIDFilterFilename:
if partCtx.memPart != nil {
- if err := s.handleTraceIDFilterChunk(partCtx, chunk);
err != nil {
- return fmt.Errorf("failed to handle traceID
filter chunk: %w", err)
- }
+ s.handleTraceIDFilterChunk(partCtx, chunk)
}
case fileName == tagTypeFilename:
if partCtx.memPart != nil {
- if err := s.handleTagTypeChunk(partCtx, chunk); err !=
nil {
- return fmt.Errorf("failed to handle tag type
chunk: %w", err)
- }
+ s.handleTagTypeChunk(partCtx, chunk)
}
case strings.HasPrefix(fileName, traceTagsPrefix):
tagName := fileName[len(traceTagsPrefix):]
@@ -237,12 +233,10 @@ func (s *syncCallback) handleTraceFileChunk(ctx
*queue.ChunkedSyncPartContext, c
return nil
}
-func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext,
chunk []byte) error {
+func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext,
chunk []byte) {
partCtx.traceIDFilterBuffer = append(partCtx.traceIDFilterBuffer,
chunk...)
- return nil
}
-func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk
[]byte) error {
+func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk
[]byte) {
partCtx.tagTypeBuffer = append(partCtx.tagTypeBuffer, chunk...)
- return nil
}
diff --git a/banyand/trace/write_standalone.go
b/banyand/trace/write_standalone.go
index da0e0214..eece4100 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -225,11 +225,19 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable
*tracesInTable, writeEv
sidxTags := make([]sidx.Tag, 0, len(tags))
for _, tag := range tags {
- sidxTags = append(sidxTags, sidx.Tag{
- Name: tag.tag,
- Value: tag.value,
- ValueType: tag.valueType,
- })
+ if tag.valueArr != nil {
+ sidxTags = append(sidxTags, sidx.Tag{
+ Name: tag.tag,
+ ValueArr: tag.valueArr,
+ ValueType: tag.valueType,
+ })
+ } else {
+ sidxTags = append(sidxTags, sidx.Tag{
+ Name: tag.tag,
+ Value: tag.value,
+ ValueType: tag.valueType,
+ })
+ }
}
indexRules := stm.GetIndexRules()
diff --git a/test/cases/trace/data/input/having_query_tag_cond.yml
b/test/cases/trace/data/input/having_query_tag_cond.yml
new file mode 100644
index 00000000..17540a9a
--- /dev/null
+++ b/test/cases/trace/data/input/having_query_tag_cond.yml
@@ -0,0 +1,30 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+ index_rule_name: "zipkin-timestamp"
+ sort: "SORT_DESC"
+criteria:
+ condition:
+ name: "query"
+ op: "BINARY_OP_HAVING"
+ value:
+ strArray:
+ value: ["SELECT * FROM users"]
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index fd767504..a9d91b05 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -53,4 +53,5 @@ var _ = g.DescribeTable("Scanning Traces", func(args
helpers.Args) {
g.Entry("filter by trace id and service unknown", helpers.Args{Input:
"eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
g.Entry("filter by query", helpers.Args{Input: "having_query_tag",
Duration: 1 * time.Hour}),
g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 *
time.Hour, WantErr: true}),
+ g.Entry("filter by query with having condition", helpers.Args{Input:
"having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}),
)