This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 9bf9b697 Remove Bloom filter for dictionary-encoded tags (#849)
9bf9b697 is described below
commit 9bf9b697d4a930f2912803940cd9d8610d33a6f7
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Nov 25 08:39:54 2025 +0800
Remove Bloom filter for dictionary-encoded tags (#849)
* Remove Bloom filter on dictionary-encoded tags for stream
---
CHANGES.md | 6 ++
banyand/internal/encoding/tag_encoder.go | 24 ++---
banyand/internal/encoding/tag_encoder_test.go | 15 ++-
banyand/internal/sidx/block.go | 27 ++---
banyand/internal/sidx/tag.go | 25 ++++-
banyand/internal/sidx/tag_filter_op.go | 77 ++++++++------
banyand/internal/sidx/tag_filter_op_test.go | 42 ++++----
banyand/stream/block.go | 26 +++--
banyand/stream/part_iter.go | 2 +-
banyand/stream/tag.go | 43 +++++---
banyand/stream/tag_filter.go | 68 +++++++++---
banyand/stream/tag_filter_test.go | 2 +-
banyand/trace/tag.go | 2 +-
banyand/trace/tag_test.go | 10 +-
pkg/encoding/dictionary.go | 47 +++++++++
pkg/filter/dictionary_filter.go | 144 ++++++++++++++++++++++++++
pkg/filter/dictionary_filter_test.go | 132 +++++++++++++++++++++++
17 files changed, 558 insertions(+), 134 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bb68d5f0..4877d210 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,12 @@
Release Notes.
+## 0.10.0
+
+### Features
+
+- Remove Bloom filter for dictionary-encoded tags.
+
## 0.9.0
### Features
diff --git a/banyand/internal/encoding/tag_encoder.go
b/banyand/internal/encoding/tag_encoder.go
index 2e375367..7197906b 100644
--- a/banyand/internal/encoding/tag_encoder.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -144,9 +144,9 @@ func UnmarshalVarArray(dest, src []byte) ([]byte, []byte,
error) {
// For int64: uses delta encoding with first value storage.
// For float64: converts to decimal integers with exponent, then delta
encoding.
// For other types: uses dictionary encoding, falls back to plain with zstd
compression.
-func EncodeTagValues(bb *bytes.Buffer, values [][]byte, valueType
pbv1.ValueType) error {
+func EncodeTagValues(bb *bytes.Buffer, values [][]byte, valueType
pbv1.ValueType) (encoding.EncodeType, error) {
if len(values) == 0 {
- return nil
+ return encoding.EncodeTypeUnknown, nil
}
switch valueType {
@@ -175,7 +175,7 @@ func DecodeTagValues(dst [][]byte, decoder
*encoding.BytesBlockDecoder, bb *byte
}
}
-func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte) error {
+func encodeInt64TagValues(bb *bytes.Buffer, values [][]byte)
(encoding.EncodeType, error) {
intValuesPtr := generateInt64Slice(len(values))
intValues := *intValuesPtr
defer releaseInt64Slice(intValuesPtr)
@@ -187,7 +187,7 @@ func encodeInt64TagValues(bb *bytes.Buffer, values
[][]byte) error {
bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
// Prepend EncodeTypePlain at the head of compressed
data
bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)},
bb.Buf...)
- return nil
+ return encoding.EncodeTypePlain, nil
}
if len(v) != 8 {
logger.Panicf("invalid value length at index %d:
expected 8 bytes, got %d", i, len(v))
@@ -207,10 +207,10 @@ func encodeInt64TagValues(bb *bytes.Buffer, values
[][]byte) error {
append([]byte{byte(encodeType)}, firstValueBytes...),
bb.Buf...,
)
- return nil
+ return encodeType, nil
}
-func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte) error {
+func encodeFloat64TagValues(bb *bytes.Buffer, values [][]byte)
(encoding.EncodeType, error) {
intValuesPtr := generateInt64Slice(len(values))
intValues := *intValuesPtr
defer releaseInt64Slice(intValuesPtr)
@@ -227,7 +227,7 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values
[][]byte) error {
bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
// Prepend EncodeTypePlain at the head of compressed
data
bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)},
bb.Buf...)
- return nil
+ return encoding.EncodeTypePlain, nil
}
if len(v) != 8 {
logger.Panicf("invalid value length at index %d:
expected 8 bytes, got %d", i, len(v))
@@ -241,7 +241,7 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values
[][]byte) error {
bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
// Prepend EncodeTypePlain at the head of compressed data
bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)},
bb.Buf...)
- return nil
+ return encoding.EncodeTypePlain, nil
}
var firstValue int64
@@ -257,10 +257,10 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values
[][]byte) error {
append(append([]byte{byte(encodeType)}, expBytes...),
firstValueBytes...),
bb.Buf...,
)
- return nil
+ return encodeType, nil
}
-func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte) error {
+func encodeDefaultTagValues(bb *bytes.Buffer, values [][]byte)
(encoding.EncodeType, error) {
dict := generateDictionary()
defer releaseDictionary(dict)
@@ -269,14 +269,14 @@ func encodeDefaultTagValues(bb *bytes.Buffer, values
[][]byte) error {
// Dictionary encoding failed, use plain encoding with
zstd compression
bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)},
bb.Buf...)
- return nil
+ return encoding.EncodeTypePlain, nil
}
}
// Dictionary encoding succeeded
bb.Buf = dict.Encode(bb.Buf[:0])
bb.Buf = append([]byte{byte(encoding.EncodeTypeDictionary)}, bb.Buf...)
- return nil
+ return encoding.EncodeTypeDictionary, nil
}
func decodeInt64TagValues(dst [][]byte, decoder *encoding.BytesBlockDecoder,
bb *bytes.Buffer, count uint64) ([][]byte, error) {
diff --git a/banyand/internal/encoding/tag_encoder_test.go
b/banyand/internal/encoding/tag_encoder_test.go
index fc6aa069..aeebcf45 100644
--- a/banyand/internal/encoding/tag_encoder_test.go
+++ b/banyand/internal/encoding/tag_encoder_test.go
@@ -59,9 +59,10 @@ func TestEncodeDecodeTagValues_Int64_WithNilValues(t
*testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bb := &bytes.Buffer{}
- err := EncodeTagValues(bb, tt.values,
pbv1.ValueTypeInt64)
+ encodeType, err := EncodeTagValues(bb, tt.values,
pbv1.ValueTypeInt64)
require.NoError(t, err)
require.NotNil(t, bb.Buf)
+ require.Equal(t, pkgencoding.EncodeTypePlain,
encodeType)
decoder := &pkgencoding.BytesBlockDecoder{}
decoded, err := DecodeTagValues(nil, decoder, bb,
pbv1.ValueTypeInt64, len(tt.values))
@@ -109,9 +110,10 @@ func
TestEncodeDecodeTagValues_Int64_WithNullStringValues(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bb := &bytes.Buffer{}
- err := EncodeTagValues(bb, tt.values,
pbv1.ValueTypeInt64)
+ encodeType, err := EncodeTagValues(bb, tt.values,
pbv1.ValueTypeInt64)
require.NoError(t, err)
require.NotNil(t, bb.Buf)
+ require.Equal(t, pkgencoding.EncodeTypePlain,
encodeType)
decoder := &pkgencoding.BytesBlockDecoder{}
decoded, err := DecodeTagValues(nil, decoder, bb,
pbv1.ValueTypeInt64, len(tt.values))
@@ -140,9 +142,10 @@ func
TestEncodeDecodeTagValues_Int64_MixedNilAndNullString(t *testing.T) {
}
bb := &bytes.Buffer{}
- err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64)
+ encodeType, err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64)
require.NoError(t, err)
require.NotNil(t, bb.Buf)
+ require.Equal(t, pkgencoding.EncodeTypePlain, encodeType)
decoder := &pkgencoding.BytesBlockDecoder{}
decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64,
len(values))
@@ -173,9 +176,10 @@ func TestEncodeDecodeTagValues_Int64_ValidValues(t
*testing.T) {
}
bb := &bytes.Buffer{}
- err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64)
+ encodeType, err := EncodeTagValues(bb, values, pbv1.ValueTypeInt64)
require.NoError(t, err)
require.NotNil(t, bb.Buf)
+ require.Equal(t, pkgencoding.EncodeTypeDelta, encodeType)
decoder := &pkgencoding.BytesBlockDecoder{}
decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64,
len(values))
@@ -189,9 +193,10 @@ func TestEncodeDecodeTagValues_Int64_ValidValues(t
*testing.T) {
func TestEncodeDecodeTagValues_Int64_EmptyInput(t *testing.T) {
bb := &bytes.Buffer{}
- err := EncodeTagValues(bb, nil, pbv1.ValueTypeInt64)
+ encodeType, err := EncodeTagValues(bb, nil, pbv1.ValueTypeInt64)
require.NoError(t, err)
assert.Nil(t, bb.Buf)
+ require.Equal(t, pkgencoding.EncodeTypeUnknown, encodeType)
decoder := &pkgencoding.BytesBlockDecoder{}
decoded, err := DecodeTagValues(nil, decoder, bb, pbv1.ValueTypeInt64,
0)
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index 180c2020..4c68bad3 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -279,7 +279,7 @@ func (b *block) mustWriteTag(tagName string, td *tagData,
bm *blockMetadata, ww
}()
// Encode tag values using the encoding module
- err := internalencoding.EncodeTagValues(bb, td.tmpBytes, td.valueType)
+ encodeType, err := internalencoding.EncodeTagValues(bb, td.tmpBytes,
td.valueType)
if err != nil {
panic(fmt.Sprintf("failed to encode tag values: %v", err))
}
@@ -350,22 +350,23 @@ func (b *block) mustWriteTag(tagName string, td *tagData,
bm *blockMetadata, ww
addUnique(td.values[i].value)
}
- bf := generateBloomFilter(len(uniqueValues))
- for v := range uniqueValues {
- bf.Add(convert.StringToBytes(v))
- }
-
- bb.Buf = encodeBloomFilter(bb.Buf[:0], bf)
- tm.filterBlock.offset = tfw.bytesWritten
- tm.filterBlock.size = uint64(len(bb.Buf))
- tfw.MustWrite(bb.Buf)
- releaseBloomFilter(bf)
-
- // Compute min/max for int64 tags during unique value iteration
if td.valueType == pbv1.ValueTypeInt64 && hasMinMax {
tm.min = encoding.Int64ToBytes(nil, minVal)
tm.max = encoding.Int64ToBytes(nil, maxVal)
}
+ isDictionaryEncoded := encodeType == encoding.EncodeTypeDictionary
+ if !isDictionaryEncoded {
+ bf := generateBloomFilter(len(uniqueValues))
+ for v := range uniqueValues {
+ bf.Add(convert.StringToBytes(v))
+ }
+
+ bb.Buf = encodeBloomFilter(bb.Buf[:0], bf)
+ tm.filterBlock.offset = tfw.bytesWritten
+ tm.filterBlock.size = uint64(len(bb.Buf))
+ tfw.MustWrite(bb.Buf)
+ releaseBloomFilter(bf)
+ }
// Marshal and write tag metadata
bb.Buf = bb.Buf[:0]
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index a342860a..89b521dd 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -64,9 +64,10 @@ func (tr *tagRow) reset() {
}
var (
- tagDataPool = pool.Register[*tagData]("sidx-tagData")
- tagMetadataPool = pool.Register[*tagMetadata]("sidx-tagMetadata")
- bloomFilterPool = pool.Register[*filter.BloomFilter]("sidx-bloomFilter")
+ tagDataPool = pool.Register[*tagData]("sidx-tagData")
+ tagMetadataPool = pool.Register[*tagMetadata]("sidx-tagMetadata")
+ bloomFilterPool =
pool.Register[*filter.BloomFilter]("sidx-bloomFilter")
+ dictionaryFilterPool =
pool.Register[*filter.DictionaryFilter]("sidx-dictionaryFilter")
)
// generateTagData gets a tagData from pool or creates new.
@@ -159,6 +160,24 @@ func releaseBloomFilter(bf *filter.BloomFilter) {
bloomFilterPool.Put(bf)
}
+// generateDictionaryFilter gets a dictionary filter from pool or creates new.
+func generateDictionaryFilter() *filter.DictionaryFilter {
+ v := dictionaryFilterPool.Get()
+ if v == nil {
+ return &filter.DictionaryFilter{}
+ }
+ return v
+}
+
+// releaseDictionaryFilter returns dictionary filter to pool after reset.
+func releaseDictionaryFilter(df *filter.DictionaryFilter) {
+ if df == nil {
+ return
+ }
+ df.Reset()
+ dictionaryFilterPool.Put(df)
+}
+
// encodeBloomFilter encodes a bloom filter to bytes.
func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
if bf == nil {
diff --git a/banyand/internal/sidx/tag_filter_op.go
b/banyand/internal/sidx/tag_filter_op.go
index 2f3318cc..c2d434f5 100644
--- a/banyand/internal/sidx/tag_filter_op.go
+++ b/banyand/internal/sidx/tag_filter_op.go
@@ -37,12 +37,17 @@ type tagFilterOp struct {
tagCache map[string]*tagFilterCache
}
+// Filter is an interface for tag filtering.
+type Filter interface {
+ MightContain(item []byte) bool
+}
+
// tagFilterCache caches tag filter data for a specific tag.
type tagFilterCache struct {
- bloomFilter *filter.BloomFilter
- min []byte
- max []byte
- valueType pbv1.ValueType
+ filter Filter
+ min []byte
+ max []byte
+ valueType pbv1.ValueType
}
// Eq checks if a tag equals a specific value by reading tag data and checking
bloom filter.
@@ -64,12 +69,12 @@ func (tfo *tagFilterOp) Eq(tagName string, tagValue string)
bool {
return true // Conservative approach - don't filter out
}
- // Use bloom filter to check if the value might exist
- if cache.bloomFilter != nil {
- return cache.bloomFilter.MightContain([]byte(tagValue))
+ // Use filter to check if the value might exist
+ if cache.filter != nil {
+ return cache.filter.MightContain([]byte(tagValue))
}
- // If no bloom filter, conservatively return true
+ // If no filter, conservatively return true
return true
}
@@ -93,18 +98,18 @@ func (tfo *tagFilterOp) Having(tagName string, tagValues
[]string) bool {
return true // Conservative approach - don't filter out
}
- // Use bloom filter to check if any value might exist
- if cache.bloomFilter != nil {
+ // Use filter to check if any value might exist
+ if cache.filter != nil {
for _, tagValue := range tagValues {
- if cache.bloomFilter.MightContain([]byte(tagValue)) {
+ if cache.filter.MightContain([]byte(tagValue)) {
return true // Return true as soon as we find a
potential match
}
}
- // None of the values might exist in the bloom filter
+ // None of the values might exist in the filter
return false
}
- // If no bloom filter, conservatively return true
+ // If no filter, conservatively return true
return true
}
@@ -183,13 +188,30 @@ func (tfo *tagFilterOp) getTagFilterCache(tagName string,
tagBlock dataBlock) (*
copy(cache.min, tagMetadata.min)
copy(cache.max, tagMetadata.max)
- // Read bloom filter if available
+ // Read filter data
if tagMetadata.filterBlock.size > 0 {
bf, err := tfo.readBloomFilter(tagName, tagMetadata.filterBlock)
if err != nil {
return nil, fmt.Errorf("failed to read bloom filter:
%w", err)
}
- cache.bloomFilter = bf
+ cache.filter = bf
+ } else {
+ tagDataReader, _ := tfo.part.getTagDataReader(tagName)
+ encodeTypeBuf := make([]byte, 1)
+ fs.MustReadData(tagDataReader,
int64(tagMetadata.dataBlock.offset), encodeTypeBuf)
+ encodeType := encoding.EncodeType(encodeTypeBuf[0])
+ if encodeType == encoding.EncodeTypeDictionary {
+ tagData := make([]byte, tagMetadata.dataBlock.size)
+ fs.MustReadData(tagDataReader,
int64(tagMetadata.dataBlock.offset), tagData)
+ dictValues, err :=
encoding.DecodeDictionaryValues(tagData[1:])
+ if err != nil {
+ logger.Panicf("failed to extract dictionary
values for tag %s: %v", tagName, err)
+ }
+ df := generateDictionaryFilter()
+ df.SetValues(dictValues)
+ df.SetValueType(tagMetadata.valueType)
+ cache.filter = df
+ }
}
// Cache the result
@@ -241,29 +263,18 @@ func (tfo *tagFilterOp) readBloomFilter(tagName string,
filterBlock dataBlock) (
return decodeBloomFilter(filterData)
}
-// decodeBloomFilterFromBytes decodes bloom filter data (similar to stream
module).
-func decodeBloomFilterFromBytes(src []byte, bf *filter.BloomFilter)
*filter.BloomFilter {
- n := encoding.BytesToInt64(src)
- bf.SetN(int(n))
-
- // With B=16, use optimized bit shift calculation
- bits := make([]uint64, 0)
- bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64(filter.OptimalBitsSize(int(n))))
- if err != nil {
- logger.Panicf("failed to decode Bloom filter: %v", err)
- }
- bf.SetBits(bits)
-
- return bf
-}
-
// reset resets the tagFilterOp for reuse.
func (tfo *tagFilterOp) reset() {
tfo.blockMetadata = nil
tfo.part = nil
for key, cache := range tfo.tagCache {
- if cache.bloomFilter != nil {
- releaseBloomFilter(cache.bloomFilter)
+ if cache.filter != nil {
+ switch f := cache.filter.(type) {
+ case *filter.BloomFilter:
+ releaseBloomFilter(f)
+ case *filter.DictionaryFilter:
+ releaseDictionaryFilter(f)
+ }
}
delete(tfo.tagCache, key)
}
diff --git a/banyand/internal/sidx/tag_filter_op_test.go
b/banyand/internal/sidx/tag_filter_op_test.go
index 611c900b..8357e9fc 100644
--- a/banyand/internal/sidx/tag_filter_op_test.go
+++ b/banyand/internal/sidx/tag_filter_op_test.go
@@ -69,10 +69,10 @@ func TestTagFilterOpReset(t *testing.T) {
part: &part{},
tagCache: map[string]*tagFilterCache{
"tag1": {
- bloomFilter: filter.NewBloomFilter(100),
- min: []byte("min"),
- max: []byte("max"),
- valueType: pbv1.ValueTypeStr,
+ filter: filter.NewBloomFilter(100),
+ min: []byte("min"),
+ max: []byte("max"),
+ valueType: pbv1.ValueTypeStr,
},
},
}
@@ -386,10 +386,11 @@ func TestDecodeBloomFilterFromBytes(t *testing.T) {
buf.Write(encodedBits)
// Test decoding
- decodedBF := filter.NewBloomFilter(0)
- result := decodeBloomFilterFromBytes(buf.Bytes(), decodedBF)
+ decodedBF, err := decodeBloomFilter(buf.Bytes())
+ assert.NoError(t, err)
// Verify decoded bloom filter
+ result := decodedBF
assert.Equal(t, originalBF.N(), result.N())
assert.True(t, result.MightContain([]byte("test-value-1")))
assert.True(t, result.MightContain([]byte("test-value-2")))
@@ -402,13 +403,13 @@ func TestTagFilterCacheIntegration(t *testing.T) {
// For now, we test the basic structure and error handling
t.Run("cache creation structure", func(t *testing.T) {
cache := &tagFilterCache{
- bloomFilter: filter.NewBloomFilter(100),
- min: []byte("min_value"),
- max: []byte("max_value"),
- valueType: pbv1.ValueTypeStr,
+ filter: filter.NewBloomFilter(100),
+ min: []byte("min_value"),
+ max: []byte("max_value"),
+ valueType: pbv1.ValueTypeStr,
}
- assert.NotNil(t, cache.bloomFilter)
+ assert.NotNil(t, cache.filter)
assert.Equal(t, []byte("min_value"), cache.min)
assert.Equal(t, []byte("max_value"), cache.max)
assert.Equal(t, pbv1.ValueTypeStr, cache.valueType)
@@ -487,8 +488,7 @@ func BenchmarkDecodeBloomFilterFromBytes(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- decodedBF := filter.NewBloomFilter(0)
- decodeBloomFilterFromBytes(testData, decodedBF)
+ decodeBloomFilter(testData)
}
}
@@ -500,8 +500,8 @@ func BenchmarkTagFilterOpHaving(b *testing.B) {
}
cache := &tagFilterCache{
- bloomFilter: bf,
- valueType: pbv1.ValueTypeStr,
+ filter: bf,
+ valueType: pbv1.ValueTypeStr,
}
tfo := &tagFilterOp{
@@ -667,8 +667,8 @@ func TestTagFilterOpHavingWithBloomFilter(t *testing.T) {
// Create cache with bloom filter
cache := &tagFilterCache{
- bloomFilter: bf,
- valueType: pbv1.ValueTypeStr,
+ filter: bf,
+ valueType: pbv1.ValueTypeStr,
}
tfo := &tagFilterOp{
@@ -750,8 +750,8 @@ func TestTagFilterOpHavingWithBloomFilter(t *testing.T) {
func TestTagFilterOpHavingWithoutBloomFilter(t *testing.T) {
// Create cache without bloom filter
cache := &tagFilterCache{
- bloomFilter: nil,
- valueType: pbv1.ValueTypeStr,
+ filter: nil,
+ valueType: pbv1.ValueTypeStr,
}
tfo := &tagFilterOp{
@@ -781,8 +781,8 @@ func TestTagFilterOpHavingLargeList(t *testing.T) {
bf.Add([]byte("target-service"))
cache := &tagFilterCache{
- bloomFilter: bf,
- valueType: pbv1.ValueTypeStr,
+ filter: bf,
+ valueType: pbv1.ValueTypeStr,
}
tfo := &tagFilterOp{
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index ddaa6ec7..1bcb7514 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -27,8 +27,8 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
pkgbytes "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
- "github.com/apache/skywalking-banyandb/pkg/filter"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -108,13 +108,18 @@ func (b *block) processTags(tf tagValues, tagFamilyIdx, i
int, elementsLen int)
if !t.indexed {
continue
}
- if tags[j].filter == nil {
- filter := generateBloomFilter()
- tags[j].filter = filter
+ if tags[j].uniqueValues == nil {
+ tags[j].uniqueValues = make(map[string]struct{})
+ }
+ if t.valueArr != nil {
+ for _, v := range t.valueArr {
+ if v != nil {
+
tags[j].uniqueValues[convert.BytesToString(v)] = struct{}{}
+ }
+ }
+ } else if t.value != nil {
+ tags[j].uniqueValues[convert.BytesToString(t.value)] =
struct{}{}
}
- tags[j].filter.SetN(elementsLen)
- tags[j].filter.ResizeBits(filter.OptimalBitsSize(elementsLen))
- tags[j].filter.Add(t.value)
if t.valueType == pbv1.ValueTypeInt64 {
if len(tags[j].min) == 0 {
tags[j].min = t.value
@@ -397,13 +402,6 @@ func generateBlock() *block {
}
func releaseBlock(b *block) {
- for _, tf := range b.tagFamilies {
- for _, t := range tf.tags {
- if t.filter != nil {
- releaseBloomFilter(t.filter)
- }
- }
- }
b.reset()
blockPool.Put(b)
}
diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go
index 1e1de90d..7918d33c 100644
--- a/banyand/stream/part_iter.go
+++ b/banyand/stream/part_iter.go
@@ -238,7 +238,7 @@ func (pi *partIter) findBlock() bool {
shouldSkip, err := func() (bool, error) {
tfs := generateTagFamilyFilters()
defer releaseTagFamilyFilters(tfs)
- tfs.unmarshal(bm.tagFamilies,
pi.p.tagFamilyMetadata, pi.p.tagFamilyFilter)
+ tfs.unmarshal(bm.tagFamilies,
pi.p.tagFamilyMetadata, pi.p.tagFamilyFilter, pi.p.tagFamilies)
return pi.blockFilter.ShouldSkip(tfs)
}()
if err != nil {
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index d78bcb89..769c67c9 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -20,17 +20,21 @@ package stream
import (
internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/filter"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
type tag struct {
- tagFilter
- name string
- values [][]byte
- valueType pbv1.ValueType
+ uniqueValues map[string]struct{}
+ min []byte
+ max []byte
+ name string
+ values [][]byte
+ valueType pbv1.ValueType
}
func (t *tag) reset() {
@@ -42,7 +46,14 @@ func (t *tag) reset() {
}
t.values = values[:0]
- t.tagFilter.reset()
+ t.min = t.min[:0]
+ t.max = t.max[:0]
+
+ if t.uniqueValues != nil {
+ for v := range t.uniqueValues {
+ delete(t.uniqueValues, v)
+ }
+ }
}
func (t *tag) resizeValues(valuesLen int) [][]byte {
@@ -65,7 +76,7 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer,
tagFilterWriter *w
defer bigValuePool.Release(bb)
// Use shared encoding module
- err := internalencoding.EncodeTagValues(bb, t.values, t.valueType)
+ encodeType, err := internalencoding.EncodeTagValues(bb, t.values,
t.valueType)
if err != nil {
logger.Panicf("failed to encode tag values: %v", err)
}
@@ -77,14 +88,22 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter
*writer, tagFilterWriter *w
tm.offset = tagWriter.bytesWritten
tagWriter.MustWrite(bb.Buf)
- if t.filter != nil {
+ if tm.valueType == pbv1.ValueTypeInt64 && (t.min != nil || t.max !=
nil) {
+ tm.min = t.min
+ tm.max = t.max
+ }
+ isDictionaryEncoded := encodeType == pkgencoding.EncodeTypeDictionary
+ if len(t.uniqueValues) > 0 && !isDictionaryEncoded {
+ bf := generateBloomFilter()
+ defer releaseBloomFilter(bf)
+ bf.SetN(len(t.uniqueValues))
+ bf.ResizeBits(filter.OptimalBitsSize(len(t.uniqueValues)))
+ for v := range t.uniqueValues {
+ bf.Add(convert.StringToBytes(v))
+ }
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
- bb.Buf = encodeBloomFilter(bb.Buf[:0], t.filter)
- if tm.valueType == pbv1.ValueTypeInt64 {
- tm.min = t.min
- tm.max = t.max
- }
+ bb.Buf = encodeBloomFilter(bb.Buf[:0], bf)
tm.filterBlock.size = uint64(len(bb.Buf))
tm.filterBlock.offset = tagFilterWriter.bytesWritten
tagFilterWriter.MustWrite(bb.Buf)
diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go
index 3a8748a5..c84eabf6 100644
--- a/banyand/stream/tag_filter.go
+++ b/banyand/stream/tag_filter.go
@@ -31,6 +31,11 @@ import (
"github.com/apache/skywalking-banyandb/pkg/pool"
)
+// Filter interface provides a unified interface for both BloomFilter and
DictionaryFilter.
+type Filter interface {
+ MightContain(item []byte) bool
+}
+
func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
dst = encoding.Int64ToBytes(dst, int64(bf.N()))
dst = encoding.EncodeUint64Block(dst, bf.Bits())
@@ -67,8 +72,23 @@ func releaseBloomFilter(bf *filter.BloomFilter) {
var bloomFilterPool = pool.Register[*filter.BloomFilter]("stream-bloomFilter")
+func generateDictionaryFilter() *filter.DictionaryFilter {
+ v := dictionaryFilterPool.Get()
+ if v == nil {
+ return &filter.DictionaryFilter{}
+ }
+ return v
+}
+
+func releaseDictionaryFilter(df *filter.DictionaryFilter) {
+ df.Reset()
+ dictionaryFilterPool.Put(df)
+}
+
+var dictionaryFilterPool =
pool.Register[*filter.DictionaryFilter]("stream-dictionaryFilter")
+
type tagFilter struct {
- filter *filter.BloomFilter
+ filter Filter
min []byte
max []byte
}
@@ -88,7 +108,14 @@ func generateTagFilter() *tagFilter {
}
func releaseTagFilter(tf *tagFilter) {
- releaseBloomFilter(tf.filter)
+ if tf.filter != nil {
+ switch f := tf.filter.(type) {
+ case *filter.BloomFilter:
+ releaseBloomFilter(f)
+ case *filter.DictionaryFilter:
+ releaseDictionaryFilter(f)
+ }
+ }
tf.reset()
tagFilterPool.Put(tf)
}
@@ -101,7 +128,7 @@ func (tff *tagFamilyFilter) reset() {
clear(*tff)
}
-func (tff tagFamilyFilter) unmarshal(tagFamilyMetadataBlock *dataBlock,
metaReader, filterReader fs.Reader) {
+func (tff tagFamilyFilter) unmarshal(tagFamilyMetadataBlock *dataBlock,
metaReader, filterReader fs.Reader, tagValueReader fs.Reader) {
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = pkgbytes.ResizeExact(bb.Buf[:0],
int(tagFamilyMetadataBlock.size))
@@ -113,19 +140,34 @@ func (tff tagFamilyFilter)
unmarshal(tagFamilyMetadataBlock *dataBlock, metaRead
logger.Panicf("%s: cannot unmarshal tagFamilyMetadata: %v",
metaReader.Path(), err)
}
for _, tm := range tfm.tagMetadata {
- if tm.filterBlock.size == 0 {
- continue
- }
- bb.Buf = pkgbytes.ResizeExact(bb.Buf[:0],
int(tm.filterBlock.size))
- fs.MustReadData(filterReader, int64(tm.filterBlock.offset),
bb.Buf)
- bf := generateBloomFilter()
- bf = decodeBloomFilter(bb.Buf, bf)
tf := generateTagFilter()
- tf.filter = bf
if tm.valueType == pbv1.ValueTypeInt64 {
tf.min = tm.min
tf.max = tm.max
}
+ if tm.filterBlock.size > 0 {
+ bb.Buf = pkgbytes.ResizeExact(bb.Buf[:0],
int(tm.filterBlock.size))
+ fs.MustReadData(filterReader,
int64(tm.filterBlock.offset), bb.Buf)
+ bf := generateBloomFilter()
+ bf = decodeBloomFilter(bb.Buf, bf)
+ tf.filter = bf
+ } else {
+ encodeTypeBuf := make([]byte, 1)
+ fs.MustReadData(tagValueReader, int64(tm.offset),
encodeTypeBuf)
+ encodeType := encoding.EncodeType(encodeTypeBuf[0])
+ if encodeType == encoding.EncodeTypeDictionary {
+ bb.Buf = pkgbytes.ResizeExact(bb.Buf[:0],
int(tm.size))
+ fs.MustReadData(tagValueReader,
int64(tm.offset), bb.Buf)
+ dictValues, err :=
encoding.DecodeDictionaryValues(bb.Buf[1:])
+ if err != nil {
+ logger.Panicf("failed to extract
dictionary values: %v", err)
+ }
+ df := generateDictionaryFilter()
+ df.SetValues(dictValues)
+ df.SetValueType(tm.valueType)
+ tf.filter = df
+ }
+ }
tff[tm.name] = tf
}
}
@@ -156,10 +198,10 @@ func (tfs *tagFamilyFilters) reset() {
tfs.tagFamilyFilters = tfs.tagFamilyFilters[:0]
}
-func (tfs *tagFamilyFilters) unmarshal(tagFamilies map[string]*dataBlock,
metaReader, filterReader map[string]fs.Reader) {
+func (tfs *tagFamilyFilters) unmarshal(tagFamilies map[string]*dataBlock,
metaReader, filterReader, tagValueReader map[string]fs.Reader) {
for tf := range tagFamilies {
tff := generateTagFamilyFilter()
- tff.unmarshal(tagFamilies[tf], metaReader[tf], filterReader[tf])
+ tff.unmarshal(tagFamilies[tf], metaReader[tf],
filterReader[tf], tagValueReader[tf])
tfs.tagFamilyFilters = append(tfs.tagFamilyFilters, tff)
}
}
diff --git a/banyand/stream/tag_filter_test.go
b/banyand/stream/tag_filter_test.go
index d059f0e3..7ea3dfc2 100644
--- a/banyand/stream/tag_filter_test.go
+++ b/banyand/stream/tag_filter_test.go
@@ -462,7 +462,7 @@ func BenchmarkTagFamilyFiltersUnmarshal(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tfs := generateTagFamilyFilters()
- tfs.unmarshal(tagFamilies, metaReaders,
filterReaders)
+ tfs.unmarshal(tagFamilies, metaReaders,
filterReaders, metaReaders)
releaseTagFamilyFilters(tfs)
}
})
diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go
index c8c0daaf..647358da 100644
--- a/banyand/trace/tag.go
+++ b/banyand/trace/tag.go
@@ -60,7 +60,7 @@ func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer)
{
// Use shared encoding module
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
- err := internalencoding.EncodeTagValues(bb, t.values, t.valueType)
+ _, err := internalencoding.EncodeTagValues(bb, t.values, t.valueType)
if err != nil {
logger.Panicf("failed to encode tag values: %v", err)
}
diff --git a/banyand/trace/tag_test.go b/banyand/trace/tag_test.go
index 52376c87..2ac463d7 100644
--- a/banyand/trace/tag_test.go
+++ b/banyand/trace/tag_test.go
@@ -48,7 +48,7 @@ func TestTagEncodingDecoding(t *testing.T) {
// Test encoding
bb := &pkgbytes.Buffer{}
- err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+ _, err := encoding.EncodeTagValues(bb, tag.values,
tag.valueType)
assert.NoError(t, err)
assert.NotNil(t, bb.Buf)
assert.Greater(t, len(bb.Buf), 0)
@@ -74,7 +74,7 @@ func TestTagEncodingDecoding(t *testing.T) {
// Test encoding
bb := &pkgbytes.Buffer{}
- err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+ _, err := encoding.EncodeTagValues(bb, tag.values,
tag.valueType)
assert.NoError(t, err)
assert.NotNil(t, bb.Buf)
assert.Greater(t, len(bb.Buf), 0)
@@ -100,7 +100,7 @@ func TestTagEncodingDecoding(t *testing.T) {
// Test encoding
bb := &pkgbytes.Buffer{}
- err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+ _, err := encoding.EncodeTagValues(bb, tag.values,
tag.valueType)
assert.NoError(t, err)
assert.NotNil(t, bb.Buf)
assert.Greater(t, len(bb.Buf), 0)
@@ -131,7 +131,7 @@ func TestTagEncodingDecoding(t *testing.T) {
// Test encoding
bb := &pkgbytes.Buffer{}
- err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+ _, err := encoding.EncodeTagValues(bb, tag.values,
tag.valueType)
assert.NoError(t, err)
assert.NotNil(t, bb.Buf)
assert.Greater(t, len(bb.Buf), 0)
@@ -162,7 +162,7 @@ func TestTagEncodingDecoding(t *testing.T) {
// Test encoding
bb := &pkgbytes.Buffer{}
- err := encoding.EncodeTagValues(bb, tag.values, tag.valueType)
+ _, err := encoding.EncodeTagValues(bb, tag.values,
tag.valueType)
assert.NoError(t, err)
assert.Nil(t, bb.Buf)
diff --git a/pkg/encoding/dictionary.go b/pkg/encoding/dictionary.go
index ed9b4a61..0aa10965 100644
--- a/pkg/encoding/dictionary.go
+++ b/pkg/encoding/dictionary.go
@@ -255,3 +255,50 @@ func decodeBitPacking(dst []uint32, src []byte) ([]uint32,
error) {
decoder := newBitPackingDecoder(br)
return decoder.decode(dst)
}
+
+// DecodeDictionaryValues extracts only the dictionary values without decoding
indices.
+func DecodeDictionaryValues(src []byte) ([][]byte, error) {
+ if len(src) == 0 {
+ return nil, nil
+ }
+
+ src, count := BytesToVarUint64(src)
+ if count == 0 {
+ return nil, nil
+ }
+
+ // Decode only the values part, skip the indices
+ u64List := GenerateUint64List(0)
+ defer ReleaseUint64List(u64List)
+
+ var tail []byte
+ var err error
+ u64List.L, tail, err = DecodeUint64Block(u64List.L[:0], src, count)
+ if err != nil {
+ return nil, fmt.Errorf("cannot decode string lengths: %w", err)
+ }
+ aLens := u64List.L
+ src = tail
+
+ var decompressedData []byte
+ decompressedData, _, err = decompressBlock(nil, src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot decode bytes block with strings:
%w", err)
+ }
+
+ var values [][]byte
+ data := decompressedData
+ for _, sLen := range aLens {
+ if uint64(len(data)) < sLen {
+ return nil, fmt.Errorf("cannot decode a string with the
length %d bytes from %d bytes", sLen, len(data))
+ }
+ if sLen == 0 {
+ values = append(values, nil)
+ continue
+ }
+ values = append(values, data[:sLen])
+ data = data[sLen:]
+ }
+
+ return values, nil
+}
diff --git a/pkg/filter/dictionary_filter.go b/pkg/filter/dictionary_filter.go
new file mode 100644
index 00000000..56543fb6
--- /dev/null
+++ b/pkg/filter/dictionary_filter.go
@@ -0,0 +1,144 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package filter
+
+import (
+ "bytes"
+
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// DictionaryFilter is a filter implementation backed by a dictionary.
+type DictionaryFilter struct {
+ values [][]byte
+ valueType pbv1.ValueType
+}
+
+// NewDictionaryFilter creates a new dictionary filter with the given values.
+func NewDictionaryFilter(values [][]byte) *DictionaryFilter {
+ return &DictionaryFilter{
+ values: values,
+ }
+}
+
+// MightContain checks if an item is in the dictionary.
+func (df *DictionaryFilter) MightContain(item []byte) bool {
+ if df.valueType == pbv1.ValueTypeStrArr || df.valueType ==
pbv1.ValueTypeInt64Arr {
+ for _, serializedArray := range df.values {
+ if containElement(serializedArray, item, df.valueType) {
+ return true
+ }
+ }
+ return false
+ }
+
+ for _, v := range df.values {
+ if bytes.Equal(v, item) {
+ return true
+ }
+ }
+ return false
+}
+
+// SetValues sets the dictionary values.
+func (df *DictionaryFilter) SetValues(values [][]byte) {
+ df.values = values
+}
+
+// SetValueType sets the value type for the dictionary filter.
+func (df *DictionaryFilter) SetValueType(valueType pbv1.ValueType) {
+ df.valueType = valueType
+}
+
+// Reset resets the dictionary filter.
+func (df *DictionaryFilter) Reset() {
+ for i := range df.values {
+ df.values[i] = nil
+ }
+ df.values = df.values[:0]
+ df.valueType = pbv1.ValueTypeUnknown
+}
+
+func containElement(serializedArray []byte, element []byte, valueType
pbv1.ValueType) bool {
+ if len(serializedArray) == 0 {
+ return false
+ }
+ if valueType == pbv1.ValueTypeInt64Arr {
+ if len(element) != 8 {
+ return false
+ }
+ for i := 0; i < len(serializedArray); i += 8 {
+ if i+8 > len(serializedArray) {
+ break
+ }
+ if bytes.Equal(serializedArray[i:i+8], element) {
+ return true
+ }
+ }
+ return false
+ }
+ if valueType == pbv1.ValueTypeStrArr {
+ return containString(serializedArray, element)
+ }
+ return false
+}
+
+func containString(serializedArray, element []byte) bool {
+ const (
+ entityDelimiter = '|'
+ escape = '\\'
+ )
+
+ src := serializedArray
+ var buf []byte
+ for len(src) > 0 {
+ buf = buf[:0]
+ if len(src) == 0 {
+ break
+ }
+ if src[0] == entityDelimiter {
+ if len(element) == 0 {
+ return true
+ }
+ src = src[1:]
+ continue
+ }
+ for len(src) > 0 {
+ switch {
+ case src[0] == escape:
+ if len(src) < 2 {
+ return false
+ }
+ src = src[1:]
+ buf = append(buf, src[0])
+ case src[0] == entityDelimiter:
+ src = src[1:]
+ if bytes.Equal(buf, element) {
+ return true
+ }
+ goto nextElement
+ default:
+ buf = append(buf, src[0])
+ }
+ src = src[1:]
+ }
+ return false
+ nextElement:
+ }
+ return false
+}
diff --git a/pkg/filter/dictionary_filter_test.go
b/pkg/filter/dictionary_filter_test.go
new file mode 100644
index 00000000..594479fb
--- /dev/null
+++ b/pkg/filter/dictionary_filter_test.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package filter
+
+import (
+ stdbytes "bytes"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+const (
+ entityDelimiter = '|'
+ escape = '\\'
+)
+
+func marshalVarArray(dest, src []byte) []byte {
+ if stdbytes.IndexByte(src, entityDelimiter) < 0 &&
stdbytes.IndexByte(src, escape) < 0 {
+ dest = append(dest, src...)
+ dest = append(dest, entityDelimiter)
+ return dest
+ }
+ for _, b := range src {
+ if b == entityDelimiter || b == escape {
+ dest = append(dest, escape)
+ }
+ dest = append(dest, b)
+ }
+ dest = append(dest, entityDelimiter)
+ return dest
+}
+
+func TestDictionaryFilter(t *testing.T) {
+ assert := assert.New(t)
+
+ items := [][]byte{
+ []byte("skywalking"),
+ []byte("banyandb"),
+ []byte(""),
+ []byte("hello"),
+ []byte("world"),
+ }
+
+ df := NewDictionaryFilter(items[:3])
+ assert.NotNil(df)
+
+ for i := 0; i < 3; i++ {
+ mightContain := df.MightContain(items[i])
+ assert.True(mightContain)
+ }
+
+ for i := 3; i < 5; i++ {
+ mightContain := df.MightContain(items[i])
+ assert.False(mightContain)
+ }
+}
+
+func TestDictionaryFilterResetClearsValues(t *testing.T) {
+ assert := assert.New(t)
+
+ key := []byte("reuse-key")
+
+ df := NewDictionaryFilter([][]byte{key})
+ assert.True(df.MightContain(key))
+
+ df.Reset()
+ assert.False(df.MightContain(key))
+}
+
+func TestDictionaryFilterInt64Arr(t *testing.T) {
+ assert := assert.New(t)
+
+ items := []int64{1, 2, 3, 4, 5}
+ dst := make([]byte, 0, 24)
+ for i := 0; i < 3; i++ {
+ dst = append(dst, convert.Int64ToBytes(items[i])...)
+ }
+
+ df := NewDictionaryFilter([][]byte{dst})
+ df.SetValueType(pbv1.ValueTypeInt64Arr)
+
+ for i := 0; i < 3; i++ {
+ assert.True(df.MightContain(convert.Int64ToBytes(items[i])))
+ }
+ for i := 3; i < 5; i++ {
+ assert.False(df.MightContain(convert.Int64ToBytes(items[i])))
+ }
+}
+
+func TestDictionaryFilterStrArr(t *testing.T) {
+ assert := assert.New(t)
+
+ items := [][]byte{
+ []byte("skywalking"),
+ []byte("banyandb"),
+ []byte(""),
+ []byte("hello"),
+ []byte("world"),
+ }
+ dst := make([]byte, 0)
+ for i := 0; i < 3; i++ {
+ dst = marshalVarArray(dst, items[i])
+ }
+
+ df := NewDictionaryFilter([][]byte{dst})
+ df.SetValueType(pbv1.ValueTypeStrArr)
+
+ for i := 0; i < 3; i++ {
+ assert.True(df.MightContain(items[i]))
+ }
+ for i := 3; i < 5; i++ {
+ assert.False(df.MightContain(items[i]))
+ }
+}