This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/sidx-bloom in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e4218337ce18fd05b54a4e3868bddf285e7f0d6d Author: Gao Hongtao <[email protected]> AuthorDate: Fri Sep 26 22:11:41 2025 +0800 Add support for variable-length array tags in encoding and sidx modules - Introduced `MarshalVarArray` function to handle marshaling of byte slices into a variable-length array format, including escaping of special characters. - Updated the `tag` struct to support an array of values (`valueArr`) in addition to single values. - Modified the `processTag` method to correctly handle and marshal array values during tag processing. - Enhanced unit tests to validate the new functionality for both encoding and querying of tags with array values. - Updated related test cases to ensure proper handling of new tag structures in various scenarios. --- banyand/internal/encoding/tag_encoder.go | 27 +++++++++++ banyand/internal/encoding/tag_encoder_test.go | 46 +++++++++++++++++++ banyand/internal/sidx/block.go | 20 ++++++--- banyand/internal/sidx/block_test.go | 48 +++++++++++++------- banyand/internal/sidx/element.go | 42 ++++++++++++++--- banyand/internal/sidx/interfaces.go | 48 +++++++++----------- banyand/internal/sidx/sidx_test.go | 52 +++++++++++++++++++++- banyand/internal/sidx/tag_test.go | 6 ++- banyand/trace/write_standalone.go | 18 +++++--- .../trace/data/input/having_query_tag_cond.yml | 30 +++++++++++++ test/cases/trace/trace.go | 1 + 11 files changed, 274 insertions(+), 64 deletions(-) diff --git a/banyand/internal/encoding/tag_encoder.go b/banyand/internal/encoding/tag_encoder.go index 78bcf0bf..4800572d 100644 --- a/banyand/internal/encoding/tag_encoder.go +++ b/banyand/internal/encoding/tag_encoder.go @@ -21,6 +21,8 @@ package encoding import ( + stdbytes "bytes" + "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -29,6 +31,13 @@ import ( "github.com/apache/skywalking-banyandb/pkg/pool" ) +const ( + // EntityDelimiter is the delimiter for entities in a variable-length array. + EntityDelimiter = '|' + // Escape is the escape character for entities in a variable-length array. + Escape = '\\' +) + var ( int64SlicePool = pool.Register[*[]int64]("tag-encoder-int64Slice") float64SlicePool = pool.Register[*[]float64]("tag-encoder-float64Slice") @@ -86,6 +95,24 @@ func releaseDictionary(d *encoding.Dictionary) { dictionaryPool.Put(d) } +// MarshalVarArray marshals a byte slice into a variable-length array format. +// It escapes delimiter and escape characters within the source slice. +func MarshalVarArray(dest, src []byte) []byte { + if stdbytes.IndexByte(src, EntityDelimiter) < 0 && stdbytes.IndexByte(src, Escape) < 0 { + dest = append(dest, src...) + dest = append(dest, EntityDelimiter) + return dest + } + for _, b := range src { + if b == EntityDelimiter || b == Escape { + dest = append(dest, Escape) + } + dest = append(dest, b) + } + dest = append(dest, EntityDelimiter) + return dest +} + // EncodeTagValues encodes tag values based on the value type with optimal compression. // For int64: uses delta encoding with first value storage. // For float64: converts to decimal integers with exponent, then delta encoding. diff --git a/banyand/internal/encoding/tag_encoder_test.go b/banyand/internal/encoding/tag_encoder_test.go index 52e736ed..fc6aa069 100644 --- a/banyand/internal/encoding/tag_encoder_test.go +++ b/banyand/internal/encoding/tag_encoder_test.go @@ -198,3 +198,49 @@ func TestEncodeDecodeTagValues_Int64_EmptyInput(t *testing.T) { require.NoError(t, err) assert.Nil(t, decoded) } + +func TestMarshalVarArray(t *testing.T) { + tests := []struct { + name string + input []byte + expected []byte + }{ + { + name: "empty", + input: []byte{}, + expected: []byte{'|'}, + }, + { + name: "no special chars", + input: []byte("abc"), + expected: []byte("abc|"), + }, + { + name: "with delimiter", + input: []byte("a|b"), + expected: []byte("a\\|b|"), + }, + { + name: "with escape", + input: []byte("a\\b"), + expected: []byte("a\\\\b|"), + }, + { + name: "with delimiter and escape", + input: []byte("a|\\b"), + expected: []byte("a\\|\\\\b|"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, MarshalVarArray(nil, tt.input)) + }) + } + + t.Run("multiple values", func(t *testing.T) { + var result []byte + result = MarshalVarArray(result, []byte("a|b")) + result = MarshalVarArray(result, []byte("c\\d")) + assert.Equal(t, []byte("a\\|b|c\\\\d|"), result) + }) +} diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go index 524fdb78..55dd22f3 100644 --- a/banyand/internal/sidx/block.go +++ b/banyand/internal/sidx/block.go @@ -112,13 +112,12 @@ func (b *block) processTag(tagName string, elementTags [][]*tag) { td.values = make([][]byte, len(b.userKeys)) var valueType pbv1.ValueType - // Collect values for this tag across all elements for i, tags := range elementTags { found := false for _, tag := range tags { if tag.name == tagName { - td.values[i] = tag.value + td.values[i] = tag.marshal() valueType = tag.valueType found = true break @@ -133,9 +132,20 @@ func (b *block) processTag(tagName string, elementTags [][]*tag) { // Create bloom filter for indexed tags td.filter = generateBloomFilter(len(b.userKeys)) - for _, value := range td.values { - if value != nil { - td.filter.Add(value) + for _, tags := range elementTags { + for _, tag := range tags { + if tag.name == tagName { + if tag.valueArr != nil { + for _, v := range tag.valueArr { + if v != nil { + td.filter.Add(v) + } + } + } else if tag.value != nil { + td.filter.Add(tag.value) + } + break + } } } diff --git a/banyand/internal/sidx/block_test.go b/banyand/internal/sidx/block_test.go index b3ed92c9..64627ef4 100644 --- a/banyand/internal/sidx/block_test.go +++ b/banyand/internal/sidx/block_test.go @@ -19,6 +19,10 @@ package sidx import ( "testing" + + "github.com/stretchr/testify/assert" + + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) func TestBlock_BasicOperations(t *testing.T) { @@ -127,26 +131,36 @@ func TestBlock_KeyRange(t *testing.T) { } } -func TestBlock_MemoryManagement(t *testing.T) { +func TestBlock_ProcessTag_WithArrValues(t *testing.T) { b := generateBlock() defer releaseBlock(b) - // Add some normal-sized data - b.data = append(b.data, make([]byte, 100), make([]byte, 200)) - - // Add an oversized slice (larger than maxPooledSliceSize) - oversizedData := make([]byte, maxPooledSliceSize+1) - b.data = append(b.data, oversizedData) - - // Reset should handle both normal and oversized slices correctly - b.reset() - - // After reset, data slice should be empty but not nil (since the outer slice is within limits) - if b.data == nil { - t.Error("Data slice should not be nil after reset when within count limits") + b.userKeys = []int64{100, 101} + elementTags := [][]*tag{ + { + { + name: "arr_tag", + valueArr: [][]byte{ + []byte("a"), + []byte("b"), + }, + valueType: pbv1.ValueTypeStrArr, + }, + }, + { + { + name: "arr_tag", + value: []byte("c"), + valueType: pbv1.ValueTypeStr, + }, + }, } - if len(b.data) != 0 { - t.Errorf("Data slice should be empty after reset, got length %d", len(b.data)) - } + b.processTag("arr_tag", elementTags) + + assert.Equal(t, "a|b|", string(b.tags["arr_tag"].values[0])) + assert.Equal(t, "c", string(b.tags["arr_tag"].values[1])) + assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("a"))) + assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("b"))) + assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("c"))) } diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go index 724df953..d9987e7e 100644 --- a/banyand/internal/sidx/element.go +++ b/banyand/internal/sidx/element.go @@ -22,18 +22,16 @@ package sidx import ( "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" ) -const ( - maxPooledSliceSize = 1024 * 1024 // 1MB -) - // tag represents an individual tag (not tag family like stream). type tag struct { name string value []byte + valueArr [][]byte valueType pbv1.ValueType } @@ -49,9 +47,26 @@ type elements struct { func (t *tag) reset() { t.name = "" t.value = nil + t.valueArr = nil t.valueType = pbv1.ValueTypeUnknown } +// marshal marshals the tag value to a byte slice. +func (t *tag) marshal() []byte { + if t.valueArr != nil { + var dst []byte + for i := range t.valueArr { + if t.valueType == pbv1.ValueTypeInt64Arr { + dst = append(dst, t.valueArr[i]...) + continue + } + dst = encoding.MarshalVarArray(dst, t.valueArr[i]) + } + return dst + } + return t.value +} + // reset elements collection for pooling. func (e *elements) reset() { e.seriesIDs = e.seriesIDs[:0] @@ -73,7 +88,15 @@ func (e *elements) reset() { // size returns the size of the tag in bytes. func (t *tag) size() int { - return len(t.name) + len(t.value) + 1 // +1 for valueType + size := len(t.name) + 1 // +1 for valueType + if t.valueArr != nil { + for _, v := range t.valueArr { + size += len(v) + } + } else { + size += len(t.value) + } + return size } // size returns the total size of all elements. @@ -166,7 +189,14 @@ func (e *elements) mustAppend(seriesID common.SeriesID, userKey int64, data []by for _, t := range tags { newTag := generateTag() newTag.name = t.Name - newTag.value = append([]byte(nil), t.Value...) + if t.ValueArr != nil { + newTag.valueArr = make([][]byte, len(t.ValueArr)) + for i, v := range t.ValueArr { + newTag.valueArr[i] = append(newTag.valueArr[i][:0], v...) + } + } else { + newTag.value = append(newTag.value[:0], t.Value...) + } newTag.valueType = t.ValueType elementTags = append(elementTags, newTag) } diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index 94cf6d29..8b61c802 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -342,28 +342,29 @@ func (rm *ResponseMetadata) Validate() error { type Tag struct { Name string Value []byte + ValueArr [][]byte ValueType pbv1.ValueType } -// NewTag creates a new Tag instance with the given values. -func NewTag(name string, value []byte, valueType pbv1.ValueType) Tag { - return Tag{ - Name: name, - Value: value, - ValueType: valueType, - } -} - // Reset resets the Tag to its zero state for reuse. func (t *Tag) Reset() { t.Name = "" t.Value = nil + t.ValueArr = nil t.ValueType = pbv1.ValueTypeUnknown } // Size returns the size of the tag in bytes. func (t *Tag) Size() int { - return len(t.Name) + len(t.Value) + 1 // +1 for valueType + size := len(t.Name) + 1 // +1 for valueType + if t.ValueArr != nil { + for _, v := range t.ValueArr { + size += len(v) + } + } else { + size += len(t.Value) + } + return size } // Copy creates a deep copy of the Tag. @@ -373,31 +374,22 @@ func (t *Tag) Copy() Tag { valueCopy = make([]byte, len(t.Value)) copy(valueCopy, t.Value) } + var valueArrCopy [][]byte + if t.ValueArr != nil { + valueArrCopy = make([][]byte, len(t.ValueArr)) + for i, v := range t.ValueArr { + valueArrCopy[i] = make([]byte, len(v)) + copy(valueArrCopy[i], v) + } + } return Tag{ Name: t.Name, Value: valueCopy, + ValueArr: valueArrCopy, ValueType: t.ValueType, } } -// toInternalTag converts the exported Tag to an internal tag for use with the pooling system. -func (t *Tag) toInternalTag() *tag { - return &tag{ - name: t.Name, - value: t.Value, - valueType: t.ValueType, - } -} - -// fromInternalTag creates a Tag from an internal tag. -func fromInternalTag(t *tag) Tag { - return Tag{ - Name: t.name, - Value: t.value, - ValueType: t.valueType, - } -} - // Validate validates a WriteRequest for correctness. func (wr WriteRequest) Validate() error { if wr.SeriesID == 0 { diff --git a/banyand/internal/sidx/sidx_test.go b/banyand/internal/sidx/sidx_test.go index 95aef2b0..1025cad1 100644 --- a/banyand/internal/sidx/sidx_test.go +++ b/banyand/internal/sidx/sidx_test.go @@ -35,10 +35,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" ) -const partIDForTesting = 1 - // Test helper functions. func waitForIntroducerLoop() { @@ -403,6 +402,55 @@ func TestSIDX_Query_Ordering(t *testing.T) { } } +func TestSIDX_Query_WithArrValues(t *testing.T) { + sidx := createTestSIDX(t) + defer func() { + assert.NoError(t, sidx.Close()) + }() + + ctx := context.Background() + + // Write test data with different keys + reqs := []WriteRequest{ + createTestWriteRequest(1, 100, "data100", Tag{ + Name: "arr_tag", + ValueArr: [][]byte{ + []byte("a"), + []byte("b"), + }, + ValueType: pbv1.ValueTypeStrArr, + }), + createTestWriteRequest(1, 150, "data150"), + createTestWriteRequest(1, 200, "data200"), + } + err := sidx.Write(ctx, reqs, 1) + require.NoError(t, err) + + // Wait for introducer loop to process + waitForIntroducerLoop() + + queryReq := QueryRequest{ + SeriesIDs: []common.SeriesID{1}, + TagProjection: []model.TagProjection{ + { + Names: []string{"arr_tag"}, + }, + }, + } + + response, err := sidx.Query(ctx, queryReq) + require.NoError(t, err) + require.NotNil(t, response) + + assert.Equal(t, 3, response.Len()) + for i := 0; i < response.Len(); i++ { + if response.Keys[i] == 100 { + assert.Equal(t, "arr_tag", response.Tags[i][0].Name) + assert.Equal(t, "a|b|", string(response.Tags[i][0].Value)) + } + } +} + func TestSIDX_Query_Validation(t *testing.T) { sidx := createTestSIDX(t) defer func() { diff --git a/banyand/internal/sidx/tag_test.go b/banyand/internal/sidx/tag_test.go index 19ef70cf..2f96ff2e 100644 --- a/banyand/internal/sidx/tag_test.go +++ b/banyand/internal/sidx/tag_test.go @@ -41,7 +41,11 @@ func TestTagExportedFields(t *testing.T) { func TestNewTag(t *testing.T) { // Test the NewTag constructor function - tag := NewTag("service", []byte("order-service"), pbv1.ValueTypeStr) + tag := Tag{ + Name: "service", + Value: []byte("order-service"), + ValueType: pbv1.ValueTypeStr, + } assert.Equal(t, "service", tag.Name) assert.Equal(t, []byte("order-service"), tag.Value) diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index da0e0214..eece4100 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -225,11 +225,19 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable, writeEv sidxTags := make([]sidx.Tag, 0, len(tags)) for _, tag := range tags { - sidxTags = append(sidxTags, sidx.Tag{ - Name: tag.tag, - Value: tag.value, - ValueType: tag.valueType, - }) + if tag.valueArr != nil { + sidxTags = append(sidxTags, sidx.Tag{ + Name: tag.tag, + ValueArr: tag.valueArr, + ValueType: tag.valueType, + }) + } else { + sidxTags = append(sidxTags, sidx.Tag{ + Name: tag.tag, + Value: tag.value, + ValueType: tag.valueType, + }) + } } indexRules := stm.GetIndexRules() diff --git a/test/cases/trace/data/input/having_query_tag_cond.yml b/test/cases/trace/data/input/having_query_tag_cond.yml new file mode 100644 index 00000000..17540a9a --- /dev/null +++ b/test/cases/trace/data/input/having_query_tag_cond.yml @@ -0,0 +1,30 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "zipkin" +groups: ["zipkinTrace"] +tag_projection: ["trace_id", "span_id", "operation_name", "query"] +order_by: + index_rule_name: "zipkin-timestamp" + sort: "SORT_DESC" +criteria: + condition: + name: "query" + op: "BINARY_OP_HAVING" + value: + strArray: + value: ["SELECT * FROM users"] diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go index fd767504..a9d91b05 100644 --- a/test/cases/trace/trace.go +++ b/test/cases/trace/trace.go @@ -53,4 +53,5 @@ var _ = g.DescribeTable("Scanning Traces", func(args helpers.Args) { g.Entry("filter by trace id and service unknown", helpers.Args{Input: "eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}), g.Entry("filter by query", helpers.Args{Input: "having_query_tag", Duration: 1 * time.Hour}), g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * time.Hour, WantErr: true}), + g.Entry("filter by query with having condition", helpers.Args{Input: "having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}), )
