This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8ccede21 Add unit tests for sidx merge operation (#749)
8ccede21 is described below
commit 8ccede211b78883c47bd4792807643dd9efba291
Author: Huang Youliang <[email protected]>
AuthorDate: Thu Sep 4 21:23:24 2025 +0800
Add unit tests for sidx merge operation (#749)
---
banyand/internal/sidx/block.go | 20 +-
banyand/internal/sidx/merge.go | 4 +-
banyand/internal/sidx/merge_test.go | 431 ++++++++++++++++++++++++++++++++++++
banyand/internal/sidx/part_iter.go | 10 +-
4 files changed, 449 insertions(+), 16 deletions(-)
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index 22473bad..a24c453a 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -404,13 +404,12 @@ func fastTagAppend(bi, b *blockPointer, offset int) error
{
if len(bi.tags) != len(b.tags) {
return fmt.Errorf("unexpected number of tags: got %d; want %d",
len(b.tags), len(bi.tags))
}
- for i := range bi.tags {
- if bi.tags[i].name != b.tags[i].name {
- return fmt.Errorf("unexpected tag name for tag %q: got
%q; want %q",
- bi.tags[i].name, b.tags[i].name,
bi.tags[i].name)
+ for _, t := range bi.tags {
+ if _, exists := b.tags[t.name]; !exists {
+ return fmt.Errorf("unexpected tag name for tag %q",
t.name)
}
- assertIdxAndOffset(b.tags[i].name, len(b.tags[i].values),
b.idx, offset)
- bi.tags[i].values = append(bi.tags[i].values,
b.tags[i].values[b.idx:offset]...)
+ assertIdxAndOffset(t.name, len(b.tags[t.name].values), b.idx,
offset)
+ bi.tags[t.name].values = append(bi.tags[t.name].values,
b.tags[t.name].values[b.idx:offset]...)
}
return nil
}
@@ -418,6 +417,9 @@ func fastTagAppend(bi, b *blockPointer, offset int) error {
func fullTagAppend(bi, b *blockPointer, offset int) {
existDataSize := len(bi.userKeys)
+ if bi.tags == nil {
+ bi.tags = make(map[string]*tagData)
+ }
if len(bi.tags) == 0 {
for _, t := range b.tags {
newTagData := tagData{name: t.name, valueType:
t.valueType}
@@ -452,10 +454,10 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
}
emptySize := offset - b.idx
- for i := range bi.tags {
- if _, exists := sourceTags[bi.tags[i].name]; !exists {
+ for _, t := range bi.tags {
+ if _, exists := sourceTags[t.name]; !exists {
for j := 0; j < emptySize; j++ {
- bi.tags[i].values = append(bi.tags[i].values,
nil)
+ bi.tags[t.name].values =
append(bi.tags[t.name].values, nil)
}
}
}
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index eb397b64..d1d3c608 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -120,13 +120,13 @@ func mergeBlocks(closeCh <-chan struct{}, bw
*blockWriter, br *blockReader) (*pa
var decoder *encoding.BytesBlockDecoder
getDecoder := func() *encoding.BytesBlockDecoder {
if decoder == nil {
- decoder = generateColumnValuesDecoder()
+ decoder = generateTagValuesDecoder()
}
return decoder
}
releaseDecoder := func() {
if decoder != nil {
- releaseColumnValuesDecoder(decoder)
+ releaseTagValuesDecoder(decoder)
decoder = nil
}
}
diff --git a/banyand/internal/sidx/merge_test.go
b/banyand/internal/sidx/merge_test.go
new file mode 100644
index 00000000..a51976c8
--- /dev/null
+++ b/banyand/internal/sidx/merge_test.go
@@ -0,0 +1,431 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+ "encoding/binary"
+ "errors"
+ "path/filepath"
+ "reflect"
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func marshalStrArr(strArr [][]byte) []byte {
+ if len(strArr) == 0 {
+ return []byte{}
+ }
+ var result []byte
+ result = binary.LittleEndian.AppendUint32(result, uint32(len(strArr)))
+ for _, str := range strArr {
+ result = binary.LittleEndian.AppendUint32(result,
uint32(len(str)))
+ result = append(result, str...)
+ }
+ return result
+}
+
+var conventionalBlock = block{
+ userKeys: []int64{1, 2},
+ data: [][]byte{[]byte("data1"), []byte("data2")},
+ tags: map[string]*tagData{
+ "service": {
+ name: "service",
+ valueType: pbv1.ValueTypeStr,
+ values: [][]byte{[]byte("service1"),
[]byte("service2")},
+ },
+ },
+}
+
+var mergedBlock = block{
+ userKeys: []int64{1, 2, 3, 4},
+ data: [][]byte{[]byte("data1"), []byte("data2"), []byte("data3"),
[]byte("data4")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType: pbv1.ValueTypeStrArr,
+ values: [][]byte{
+ marshalStrArr([][]byte{[]byte("value1"),
[]byte("value2")}),
+ marshalStrArr([][]byte{[]byte("value3"),
[]byte("value4")}),
+ marshalStrArr([][]byte{[]byte("value5"),
[]byte("value6")}),
+ marshalStrArr([][]byte{[]byte("value7"),
[]byte("value8")}),
+ },
+ },
+ },
+}
+
+var duplicatedMergedBlock = block{
+ userKeys: []int64{1, 2, 2, 3, 3, 4},
+ data: [][]byte{[]byte("data1"), []byte("data2"), []byte("data3"),
[]byte("data5"), []byte("data4"), []byte("data6")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType: pbv1.ValueTypeStrArr,
+ values: [][]byte{
+ marshalStrArr([][]byte{[]byte("value1"),
[]byte("value2")}),
+ marshalStrArr([][]byte{[]byte("duplicated1")}),
+ marshalStrArr([][]byte{[]byte("value3"),
[]byte("value4")}),
+ marshalStrArr([][]byte{[]byte("value5"),
[]byte("value6")}),
+ marshalStrArr([][]byte{[]byte("duplicated2")}),
+ marshalStrArr([][]byte{[]byte("value7"),
[]byte("value8")}),
+ },
+ },
+ },
+}
+
+func Test_mergeTwoBlocks(t *testing.T) {
+ tests := []struct {
+ left *blockPointer
+ right *blockPointer
+ want *blockPointer
+ name string
+ }{
+ {
+ name: "Merge two empty blocks",
+ left: &blockPointer{},
+ right: &blockPointer{},
+ want: &blockPointer{},
+ },
+ {
+ name: "Merge left is non-empty right is empty",
+ left: &blockPointer{block: conventionalBlock},
+ right: &blockPointer{},
+ want: &blockPointer{block: conventionalBlock, bm:
blockMetadata{minKey: 1, maxKey: 2}},
+ },
+ {
+ name: "Merge left is empty right is non-empty",
+ left: &blockPointer{},
+ right: &blockPointer{block: conventionalBlock},
+ want: &blockPointer{block: conventionalBlock, bm:
blockMetadata{minKey: 1, maxKey: 2}},
+ },
+ {
+ name: "Merge two non-empty blocks without overlap",
+ left: &blockPointer{
+ block: block{
+ userKeys: []int64{1, 2},
+ data: [][]byte{[]byte("data1"),
[]byte("data2")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType:
pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
+
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
+ },
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ userKeys: []int64{3, 4},
+ data: [][]byte{[]byte("data3"),
[]byte("data4")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType:
pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: mergedBlock, bm:
blockMetadata{minKey: 1, maxKey: 4}},
+ },
+ {
+ name: "Merge two non-empty blocks without duplicated
userKeys",
+ left: &blockPointer{
+ block: block{
+ userKeys: []int64{1, 3},
+ data: [][]byte{[]byte("data1"),
[]byte("data3")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType:
pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+ },
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ userKeys: []int64{2, 4},
+ data: [][]byte{[]byte("data2"),
[]byte("data4")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType:
pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: mergedBlock, bm:
blockMetadata{minKey: 1, maxKey: 4}},
+ },
+ {
+ name: "Merge two non-empty blocks with duplicated
userKeys",
+ left: &blockPointer{
+ block: block{
+ userKeys: []int64{1, 2, 3},
+ data: [][]byte{[]byte("data1"),
[]byte("data2"), []byte("data4")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType:
pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
+
marshalStrArr([][]byte{[]byte("duplicated1")}),
+
marshalStrArr([][]byte{[]byte("duplicated2")}),
+ },
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ userKeys: []int64{2, 3, 4},
+ data: [][]byte{[]byte("data3"),
[]byte("data5"), []byte("data6")},
+ tags: map[string]*tagData{
+ "arrTag": {
+ name: "arrTag",
+ valueType:
pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
+
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ },
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: duplicatedMergedBlock, bm:
blockMetadata{minKey: 1, maxKey: 4}},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ target := &blockPointer{}
+ mergeTwoBlocks(target, tt.left, tt.right)
+ if !reflect.DeepEqual(target, tt.want) {
+ t.Errorf("mergeTwoBlocks() = %v, want %v",
target, tt.want)
+ }
+ })
+ }
+}
+
+func generateHugeElements(start, end int64, seriesIDBase common.SeriesID)
*elements {
+ es := generateElements()
+ for i := start; i <= end; i++ {
+ seriesID := seriesIDBase
+ if i%1000 == 0 {
+ seriesID = seriesIDBase + 1
+ }
+ if i%2000 == 0 {
+ seriesID = seriesIDBase + 2
+ }
+
+ tags := []Tag{
+ {Name: "service", Value: []byte("test-service"),
ValueType: pbv1.ValueTypeStr},
+ }
+ data := make([]byte, 50)
+ es.mustAppend(seriesID, i, data, tags)
+ }
+ return es
+}
+
+var (
+ es1 = func() *elements {
+ es := generateElements()
+ es.mustAppend(1, 100, make([]byte, 1600), []Tag{
+ {Name: "service", Value: []byte("service1"), ValueType:
pbv1.ValueTypeStr},
+ })
+ es.mustAppend(2, 200, make([]byte, 40), []Tag{
+ {Name: "env", Value: []byte("prod"), ValueType:
pbv1.ValueTypeStr},
+ })
+ es.mustAppend(3, 300, make([]byte, 25), []Tag{
+ {Name: "region", Value: []byte("us"), ValueType:
pbv1.ValueTypeStr},
+ })
+ return es
+ }()
+
+ es2 = func() *elements {
+ es := generateElements()
+ es.mustAppend(1, 150, make([]byte, 1600), []Tag{
+ {Name: "service", Value: []byte("service1"), ValueType:
pbv1.ValueTypeStr},
+ })
+ es.mustAppend(2, 250, make([]byte, 40), []Tag{
+ {Name: "env", Value: []byte("prod"), ValueType:
pbv1.ValueTypeStr},
+ })
+ es.mustAppend(3, 350, make([]byte, 25), []Tag{
+ {Name: "region", Value: []byte("us"), ValueType:
pbv1.ValueTypeStr},
+ })
+ return es
+ }()
+)
+
+func Test_mergeParts(t *testing.T) {
+ tests := []struct {
+ wantErr error
+ name string
+ esList []*elements
+ want []blockMetadata
+ }{
+ {
+ name: "Test with no element",
+ esList: []*elements{},
+ wantErr: errNoPartToMerge,
+ },
+ {
+ name: "Test with single part",
+ esList: []*elements{es1},
+ want: []blockMetadata{
+ {seriesID: 1, count: 1, uncompressedSize: 1623},
+ {seriesID: 2, count: 1, uncompressedSize: 55},
+ {seriesID: 3, count: 1, uncompressedSize: 41},
+ },
+ },
+ {
+ name: "Test with multiple parts with different
userKeys",
+ esList: []*elements{es1, es2, es2},
+ want: []blockMetadata{
+ {seriesID: 1, count: 3, uncompressedSize: 4869},
+ {seriesID: 2, count: 3, uncompressedSize: 165},
+ {seriesID: 3, count: 3, uncompressedSize: 123},
+ },
+ },
+ {
+ name: "Test with multiple parts with same userKeys",
+ esList: []*elements{es1, es1, es1},
+ want: []blockMetadata{
+ {seriesID: 1, count: 3, uncompressedSize: 4869},
+ {seriesID: 2, count: 3, uncompressedSize: 165},
+ {seriesID: 3, count: 3, uncompressedSize: 123},
+ },
+ },
+ {
+ name: "Test with multiple parts with a large quantity
of different userKeys",
+ esList: []*elements{generateHugeElements(1, 1000, 1),
generateHugeElements(1001, 2000, 2)},
+ want: []blockMetadata{
+ {seriesID: 1, count: 999, uncompressedSize:
76923},
+ {seriesID: 2, count: 1000, uncompressedSize:
77000},
+ {seriesID: 4, count: 1, uncompressedSize: 77},
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ verify := func(t *testing.T, pp []*partWrapper,
fileSystem fs.FileSystem, root string, partID uint64) {
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+ s := &sidx{pm: protector.Nop{}}
+ p, err := s.mergeParts(fileSystem, closeCh, pp,
partID, root)
+ if tt.wantErr != nil {
+ if !errors.Is(err, tt.wantErr) {
+ t.Fatalf("Unexpected error: got
%v, want %v", err, tt.wantErr)
+ }
+ return
+ }
+ defer func() {
+ if p != nil {
+ p.release()
+ }
+ }()
+ pmi := &partMergeIter{}
+ pmi.mustInitFromPart(p.p)
+ reader := &blockReader{}
+ reader.init([]*partMergeIter{pmi})
+ var got []blockMetadata
+ for reader.nextBlockMetadata() {
+ got = append(got, reader.block.bm)
+ }
+ require.NoError(t, reader.error())
+
+ if diff := cmp.Diff(got, tt.want,
+ cmpopts.IgnoreFields(blockMetadata{},
"tagsBlocks"),
+ cmpopts.IgnoreFields(blockMetadata{},
"tagProjection"),
+ cmpopts.IgnoreFields(blockMetadata{},
"dataBlock"),
+ cmpopts.IgnoreFields(blockMetadata{},
"keysBlock"),
+ cmpopts.IgnoreFields(blockMetadata{},
"minKey"),
+ cmpopts.IgnoreFields(blockMetadata{},
"maxKey"),
+ cmpopts.IgnoreFields(blockMetadata{},
"keysEncodeType"),
+ cmp.AllowUnexported(blockMetadata{}),
+ ); diff != "" {
+ t.Errorf("Unexpected blockMetadata
(-got +want):\n%s", diff)
+ }
+ }
+
+ t.Run("memory parts", func(t *testing.T) {
+ var pp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range pp {
+ pw.release()
+ }
+ defFn()
+ }()
+ for _, es := range tt.esList {
+ mp := generateMemPart()
+ mp.mustInitFromElements(es)
+ pp = append(pp, newPartWrapper(mp,
openMemPart(mp)))
+ }
+ verify(t, pp, fs.NewLocalFileSystem(), tmpPath,
1)
+ })
+
+ t.Run("file parts", func(t *testing.T) {
+ var fpp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range fpp {
+ pw.release()
+ }
+ defFn()
+ }()
+ fileSystem := fs.NewLocalFileSystem()
+ for i, es := range tt.esList {
+ mp := generateMemPart()
+ mp.mustInitFromElements(es)
+ partPath := filepath.Join(tmpPath,
"part_"+string(rune('0'+i)))
+ mp.mustFlush(fileSystem, partPath)
+ filePart := mustOpenPart(partPath,
fileSystem)
+ filePW := newPartWrapper(nil, filePart)
+ filePW.p.partMetadata.ID = uint64(i)
+ fpp = append(fpp, filePW)
+ releaseMemPart(mp)
+ }
+ verify(t, fpp, fileSystem, tmpPath,
uint64(len(tt.esList)))
+ })
+ })
+ }
+}
diff --git a/banyand/internal/sidx/part_iter.go
b/banyand/internal/sidx/part_iter.go
index 87b9fbf4..7b095848 100644
--- a/banyand/internal/sidx/part_iter.go
+++ b/banyand/internal/sidx/part_iter.go
@@ -388,17 +388,17 @@ func (pih *partMergeIterHeap) Pop() interface{} {
return v
}
-func generateColumnValuesDecoder() *encoding.BytesBlockDecoder {
- v := columnValuesDecoderPool.Get()
+func generateTagValuesDecoder() *encoding.BytesBlockDecoder {
+ v := tagValuesDecoderPool.Get()
if v == nil {
return &encoding.BytesBlockDecoder{}
}
return v
}
-func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
+func releaseTagValuesDecoder(d *encoding.BytesBlockDecoder) {
d.Reset()
- columnValuesDecoderPool.Put(d)
+ tagValuesDecoderPool.Put(d)
}
-var columnValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("sidx-columnValuesDecoder")
+var tagValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("sidx-tagValuesDecoder")