This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 6f59cce3 Add Having method for bloom filter (#786)
6f59cce3 is described below
commit 6f59cce399a6e317f3d473f60c54b963ac97dc2c
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 25 12:44:15 2025 +0800
Add Having method for bloom filter (#786)
* Add Having method to tagFilterOp and tagFamilyFilters for bloom filter
checks
- Implemented the Having method in tagFilterOp to check if any provided tag
values might exist in the bloom filter, returning true if at least one value is
found.
- Added corresponding Having method in tagFamilyFilters to handle multiple
tag family filters.
- Included comprehensive unit tests for both methods to ensure correct
functionality across various scenarios, including edge cases with empty lists
and large input sizes.
- Updated the index.Filter interface to include the Having method for
consistency in filter operations.
- Enhanced trace query handling with a new traceHavingFilter to support
HAVING operations in trace queries.
* Remove license dependency check from pre-push target in Makefile
* Update banyand/stream/tag_filter.go
* Enhance schemaRepo to manage closing groups
- Introduced mechanisms to mark groups as closing, preventing new
processors from being created during group deletion.
- Added methods to check and unmark closing groups.
- Updated the processor management logic to skip registration if a group is
closing.
- Ensured graceful shutdown of the local pipeline to avoid metadata events
during service stop.
---------
Signed-off-by: Gao Hongtao <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
Makefile | 1 -
banyand/internal/sidx/tag_filter_op.go | 35 +++
banyand/internal/sidx/tag_filter_op_test.go | 326 +++++++++++++++++++++++++++
banyand/measure/metadata.go | 103 ++++++---
banyand/measure/svc_standalone.go | 5 +-
banyand/measure/topn.go | 4 +
banyand/stream/tag_filter.go | 22 ++
banyand/stream/tag_filter_test.go | 309 +++++++++++++++++++++++++
pkg/index/index.go | 1 +
pkg/query/logical/trace/index_filter.go | 30 ++-
pkg/query/logical/trace/index_filter_test.go | 281 +++++++++++++++++++++++
11 files changed, 1084 insertions(+), 33 deletions(-)
diff --git a/Makefile b/Makefile
index 9a962cae..16d9225b 100644
--- a/Makefile
+++ b/Makefile
@@ -126,7 +126,6 @@ pre-push: ## Check source files before pushing to the
remote repo
$(MAKE) check-req
$(MAKE) generate
$(MAKE) lint
- $(MAKE) license-dep
$(MAKE) check
$(MAKE) vuln-check
diff --git a/banyand/internal/sidx/tag_filter_op.go
b/banyand/internal/sidx/tag_filter_op.go
index 8ed186cc..620f8aff 100644
--- a/banyand/internal/sidx/tag_filter_op.go
+++ b/banyand/internal/sidx/tag_filter_op.go
@@ -73,6 +73,41 @@ func (tfo *tagFilterOp) Eq(tagName string, tagValue string)
bool {
return true
}
+// Having checks if any of the provided tag values might exist in the bloom
filter.
+// It returns true if at least one value might be contained in the bloom
filter.
+func (tfo *tagFilterOp) Having(tagName string, tagValues []string) bool {
+ if tfo.blockMetadata == nil || tfo.part == nil {
+ return false
+ }
+
+ // Check if the tag exists in the block
+ tagBlock, exists := tfo.blockMetadata.tagsBlocks[tagName]
+ if !exists {
+ return false
+ }
+
+ // Get or create cached tag filter data
+ cache, err := tfo.getTagFilterCache(tagName, tagBlock)
+ if err != nil {
+ logger.Errorf("failed to get tag filter cache for %s: %v",
tagName, err)
+ return true // Conservative approach - don't filter out
+ }
+
+ // Use bloom filter to check if any value might exist
+ if cache.bloomFilter != nil {
+ for _, tagValue := range tagValues {
+ if cache.bloomFilter.MightContain([]byte(tagValue)) {
+ return true // Return true as soon as we find a
potential match
+ }
+ }
+ // None of the values might exist in the bloom filter
+ return false
+ }
+
+ // If no bloom filter, conservatively return true
+ return true
+}
+
// Range checks if a tag is within a specific range using min/max metadata.
func (tfo *tagFilterOp) Range(tagName string, rangeOpts index.RangeOpts)
(bool, error) {
if tfo.blockMetadata == nil || tfo.part == nil {
diff --git a/banyand/internal/sidx/tag_filter_op_test.go
b/banyand/internal/sidx/tag_filter_op_test.go
index 1f5eff87..611c900b 100644
--- a/banyand/internal/sidx/tag_filter_op_test.go
+++ b/banyand/internal/sidx/tag_filter_op_test.go
@@ -19,6 +19,7 @@ package sidx
import (
"bytes"
+ "fmt"
"testing"
"github.com/stretchr/testify/assert"
@@ -491,6 +492,331 @@ func BenchmarkDecodeBloomFilterFromBytes(b *testing.B) {
}
}
+func BenchmarkTagFilterOpHaving(b *testing.B) {
+ // Setup bloom filter with test data
+ bf := filter.NewBloomFilter(1000)
+ for i := 0; i < 500; i++ {
+ bf.Add([]byte(fmt.Sprintf("service-%d", i)))
+ }
+
+ cache := &tagFilterCache{
+ bloomFilter: bf,
+ valueType: pbv1.ValueTypeStr,
+ }
+
+ tfo := &tagFilterOp{
+ blockMetadata: &blockMetadata{
+ tagsBlocks: map[string]dataBlock{
+ "service": {offset: 0, size: 100},
+ },
+ },
+ part: &part{},
+ tagCache: map[string]*tagFilterCache{
+ "service": cache,
+ },
+ }
+
+ // Test different sizes of input lists
+ b.Run("small list (5 items)", func(b *testing.B) {
+ testValues := []string{"service-1", "service-2", "service-3",
"unknown-1", "unknown-2"}
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfo.Having("service", testValues)
+ }
+ })
+
+ b.Run("medium list (50 items)", func(b *testing.B) {
+ testValues := make([]string, 50)
+ for i := 0; i < 50; i++ {
+ if i < 25 {
+ testValues[i] = fmt.Sprintf("service-%d", i)
+ } else {
+ testValues[i] = fmt.Sprintf("unknown-%d", i)
+ }
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfo.Having("service", testValues)
+ }
+ })
+
+ b.Run("large list (500 items)", func(b *testing.B) {
+ testValues := make([]string, 500)
+ for i := 0; i < 500; i++ {
+ if i < 250 {
+ testValues[i] = fmt.Sprintf("service-%d", i)
+ } else {
+ testValues[i] = fmt.Sprintf("unknown-%d", i)
+ }
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfo.Having("service", testValues)
+ }
+ })
+
+ b.Run("early exit (match first)", func(b *testing.B) {
+ testValues := make([]string, 100)
+ testValues[0] = "service-1" // This will match
+ for i := 1; i < 100; i++ {
+ testValues[i] = fmt.Sprintf("unknown-%d", i)
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfo.Having("service", testValues)
+ }
+ })
+
+ b.Run("no matches", func(b *testing.B) {
+ testValues := make([]string, 100)
+ for i := 0; i < 100; i++ {
+ testValues[i] = fmt.Sprintf("definitely-not-there-%d",
i)
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfo.Having("service", testValues)
+ }
+ })
+}
+
+func TestTagFilterOpHaving(t *testing.T) {
+ tests := []struct {
+ blockMetadata *blockMetadata
+ part *part
+ name string
+ tagName string
+ description string
+ tagValues []string
+ expectedResult bool
+ }{
+ {
+ name: "nil block metadata",
+ blockMetadata: nil,
+ part: &part{},
+ tagName: "service",
+ tagValues: []string{"order-service",
"user-service"},
+ expectedResult: false,
+ description: "should return false when blockMetadata
is nil",
+ },
+ {
+ name: "nil part",
+ blockMetadata: &blockMetadata{tagsBlocks:
make(map[string]dataBlock)},
+ part: nil,
+ tagName: "service",
+ tagValues: []string{"order-service",
"user-service"},
+ expectedResult: false,
+ description: "should return false when part is nil",
+ },
+ {
+ name: "tag not in block",
+ blockMetadata: &blockMetadata{
+ tagsBlocks: map[string]dataBlock{
+ "other_tag": {offset: 0, size: 100},
+ },
+ },
+ part: &part{},
+ tagName: "service",
+ tagValues: []string{"order-service",
"user-service"},
+ expectedResult: false,
+ description: "should return false when tag doesn't
exist in block",
+ },
+ {
+ name: "tag exists but no cache data",
+ blockMetadata: &blockMetadata{
+ tagsBlocks: map[string]dataBlock{
+ "service": {offset: 0, size: 100},
+ },
+ },
+ part: &part{},
+ tagName: "service",
+ tagValues: []string{"order-service",
"user-service"},
+ expectedResult: true,
+ description: "should return true (conservative) when
tag exists but cache fails",
+ },
+ {
+ name: "empty tag values list",
+ blockMetadata: &blockMetadata{tagsBlocks:
make(map[string]dataBlock)},
+ part: &part{},
+ tagName: "service",
+ tagValues: []string{},
+ expectedResult: false,
+ description: "should return false when no tag values
provided",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tfo := &tagFilterOp{
+ blockMetadata: tt.blockMetadata,
+ part: tt.part,
+ tagCache: make(map[string]*tagFilterCache),
+ }
+
+ result := tfo.Having(tt.tagName, tt.tagValues)
+ assert.Equal(t, tt.expectedResult, result,
tt.description)
+ })
+ }
+}
+
+func TestTagFilterOpHavingWithBloomFilter(t *testing.T) {
+ // Create a bloom filter and add some test values
+ bf := filter.NewBloomFilter(100)
+ bf.Add([]byte("order-service"))
+ bf.Add([]byte("payment-service"))
+ bf.Add([]byte("inventory-service"))
+
+ // Create cache with bloom filter
+ cache := &tagFilterCache{
+ bloomFilter: bf,
+ valueType: pbv1.ValueTypeStr,
+ }
+
+ tfo := &tagFilterOp{
+ blockMetadata: &blockMetadata{
+ tagsBlocks: map[string]dataBlock{
+ "service": {offset: 0, size: 100},
+ },
+ },
+ part: &part{},
+ tagCache: map[string]*tagFilterCache{
+ "service": cache,
+ },
+ }
+
+ tests := []struct {
+ name string
+ description string
+ tagValues []string
+ expectedResult bool
+ }{
+ {
+ name: "all values might exist",
+ tagValues: []string{"order-service",
"payment-service"},
+ expectedResult: true,
+ description: "should return true when all values
might exist in bloom filter",
+ },
+ {
+ name: "some values might exist",
+ tagValues: []string{"order-service",
"non-existent-service"},
+ expectedResult: true,
+ description: "should return true when at least one
value might exist in bloom filter",
+ },
+ {
+ name: "no values might exist",
+ tagValues: []string{"non-existent-service-1",
"non-existent-service-2"},
+ expectedResult: false,
+ description: "should return false when no values
might exist in bloom filter",
+ },
+ {
+ name: "single value exists",
+ tagValues: []string{"inventory-service"},
+ expectedResult: true,
+ description: "should return true when single value
might exist in bloom filter",
+ },
+ {
+ name: "single value does not exist",
+ tagValues: []string{"definitely-not-there"},
+ expectedResult: false,
+ description: "should return false when single value
doesn't exist in bloom filter",
+ },
+ {
+ name: "empty values list",
+ tagValues: []string{},
+ expectedResult: false,
+ description: "should return false when no values
provided",
+ },
+ {
+ name: "mixed case with first match",
+ tagValues: []string{"order-service", "unknown-1",
"unknown-2"},
+ expectedResult: true,
+ description: "should return true immediately when
first value matches",
+ },
+ {
+ name: "mixed case with last match",
+ tagValues: []string{"unknown-1", "unknown-2",
"payment-service"},
+ expectedResult: true,
+ description: "should return true when last value
matches",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := tfo.Having("service", tt.tagValues)
+ assert.Equal(t, tt.expectedResult, result,
tt.description)
+ })
+ }
+}
+
+func TestTagFilterOpHavingWithoutBloomFilter(t *testing.T) {
+ // Create cache without bloom filter
+ cache := &tagFilterCache{
+ bloomFilter: nil,
+ valueType: pbv1.ValueTypeStr,
+ }
+
+ tfo := &tagFilterOp{
+ blockMetadata: &blockMetadata{
+ tagsBlocks: map[string]dataBlock{
+ "service": {offset: 0, size: 100},
+ },
+ },
+ part: &part{},
+ tagCache: map[string]*tagFilterCache{
+ "service": cache,
+ },
+ }
+
+ // When no bloom filter is available, should always return true
(conservative)
+ result := tfo.Having("service", []string{"any-service",
"another-service"})
+ assert.True(t, result, "should return true when no bloom filter is
available (conservative approach)")
+
+ // Test with empty list too
+ result = tfo.Having("service", []string{})
+ assert.True(t, result, "should return true even with empty list when no
bloom filter (conservative approach)")
+}
+
+func TestTagFilterOpHavingLargeList(t *testing.T) {
+ // Create a bloom filter with some values
+ bf := filter.NewBloomFilter(1000)
+ bf.Add([]byte("target-service"))
+
+ cache := &tagFilterCache{
+ bloomFilter: bf,
+ valueType: pbv1.ValueTypeStr,
+ }
+
+ tfo := &tagFilterOp{
+ blockMetadata: &blockMetadata{
+ tagsBlocks: map[string]dataBlock{
+ "service": {offset: 0, size: 100},
+ },
+ },
+ part: &part{},
+ tagCache: map[string]*tagFilterCache{
+ "service": cache,
+ },
+ }
+
+ // Create a large list with the target at the end
+ largeList := make([]string, 1000)
+ for i := 0; i < 999; i++ {
+ largeList[i] = fmt.Sprintf("non-existent-service-%d", i)
+ }
+ largeList[999] = "target-service"
+
+ result := tfo.Having("service", largeList)
+ assert.True(t, result, "should handle large lists and find target
value")
+
+ // Test with large list that has no matches
+ noMatchList := make([]string, 1000)
+ for i := 0; i < 1000; i++ {
+ noMatchList[i] = fmt.Sprintf("definitely-not-there-%d", i)
+ }
+
+ result = tfo.Having("service", noMatchList)
+ assert.False(t, result, "should return false for large list with no
matches")
+}
+
func TestTagFilterOpErrorHandling(t *testing.T) {
t.Run("invalid range bounds", func(t *testing.T) {
cache := &tagFilterCache{
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 53475c5a..67a7028b 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -80,18 +80,21 @@ type schemaRepo struct {
metadata metadata.Repo
pipeline queue.Client
l *logger.Logger
+ closingGroups map[string]struct{}
topNProcessorMap sync.Map
nodeID string
path string
+ closingGroupsMu sync.RWMutex
}
func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string,
nodeID string) *schemaRepo {
sr := &schemaRepo{
- path: path,
- l: svc.l,
- metadata: svc.metadata,
- pipeline: svc.localPipeline,
- nodeID: nodeID,
+ path: path,
+ l: svc.l,
+ metadata: svc.metadata,
+ pipeline: svc.localPipeline,
+ nodeID: nodeID,
+ closingGroups: make(map[string]struct{}),
}
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
@@ -105,10 +108,11 @@ func newSchemaRepo(path string, svc *standalone,
nodeLabels map[string]string, n
func newLiaisonSchemaRepo(path string, svc *liaison, measureDataNodeRegistry
grpc.NodeRegistry, pipeline queue.Client) *schemaRepo {
sr := &schemaRepo{
- path: path,
- l: svc.l,
- metadata: svc.metadata,
- pipeline: pipeline,
+ path: path,
+ l: svc.l,
+ metadata: svc.metadata,
+ pipeline: pipeline,
+ closingGroups: make(map[string]struct{}),
}
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
@@ -120,6 +124,31 @@ func newLiaisonSchemaRepo(path string, svc *liaison,
measureDataNodeRegistry grp
return sr
}
+// markGroupClosing marks a group as closing to prevent new processors from
being created.
+func (sr *schemaRepo) markGroupClosing(group string) {
+ sr.closingGroupsMu.Lock()
+ if sr.closingGroups == nil {
+ sr.closingGroups = make(map[string]struct{})
+ }
+ sr.closingGroups[group] = struct{}{}
+ sr.closingGroupsMu.Unlock()
+}
+
+// unmarkGroupClosing removes the closing mark of a group.
+func (sr *schemaRepo) unmarkGroupClosing(group string) {
+ sr.closingGroupsMu.Lock()
+ delete(sr.closingGroups, group)
+ sr.closingGroupsMu.Unlock()
+}
+
+// isGroupClosing returns true if the group is currently being closed.
+func (sr *schemaRepo) isGroupClosing(group string) bool {
+ sr.closingGroupsMu.RLock()
+ _, ok := sr.closingGroups[group]
+ sr.closingGroupsMu.RUnlock()
+ return ok
+}
+
func (sr *schemaRepo) start() {
sr.Watcher()
sr.metadata.
@@ -218,11 +247,22 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata
schema.Metadata) {
return
}
topNSchema := metadata.Spec.(*databasev1.TopNAggregation)
+ // Skip creating/registering processors if the group is closing
+ if sr.isGroupClosing(topNSchema.GetMetadata().GetGroup()) ||
(topNSchema.SourceMeasure != nil &&
sr.isGroupClosing(topNSchema.SourceMeasure.GetGroup())) {
+ sr.l.Debug().Str("group",
topNSchema.GetMetadata().GetGroup()).
+ Str("topN", topNSchema.GetMetadata().GetName()).
+ Msg("skip TopNAggregation registration: group
is closing")
+ return
+ }
if err := validate.TopNAggregation(topNSchema); err != nil {
sr.l.Warn().Err(err).Msg("topNAggregation is ignored")
return
}
manager := sr.getSteamingManager(topNSchema.SourceMeasure,
sr.pipeline)
+ if manager == nil {
+ // group is closing; skip registering
+ return
+ }
manager.register(topNSchema)
default:
}
@@ -235,12 +275,16 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
return
}
+ // Mark group as closing to prevent new processors from being
created during deletion
+ sr.markGroupClosing(g.Metadata.Name)
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventDelete,
Kind: resourceSchema.EventKindGroup,
Metadata: g,
})
sr.stopAllProcessorsWithGroupPrefix(g.Metadata.Name)
+ // Deletion completed; allow future re-creation
+ sr.unmarkGroupClosing(g.Metadata.Name)
case schema.KindMeasure:
m := metadata.Spec.(*databasev1.Measure)
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
@@ -356,30 +400,31 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx
context.Context, measureSchema
}
func (sr *schemaRepo) stopAllProcessorsWithGroupPrefix(groupName string) {
- var keysToDelete []string
groupPrefix := groupName + "/"
-
- sr.topNProcessorMap.Range(func(key, _ any) bool {
- keyStr := key.(string)
- if strings.HasPrefix(keyStr, groupPrefix) {
- keysToDelete = append(keysToDelete, keyStr)
- }
- return true
- })
-
- for _, key := range keysToDelete {
- if v, ok := sr.topNProcessorMap.Load(key); ok {
- manager := v.(*topNProcessorManager)
- if err := manager.Close(); err != nil {
- sr.l.Error().Err(err).Str("key",
key).Msg("failed to close topN processor manager")
- } else {
- sr.topNProcessorMap.Delete(key)
+ totalClosed := 0
+ for {
+ closedThisRound := 0
+ sr.topNProcessorMap.Range(func(key, val any) bool {
+ keyStr := key.(string)
+ if strings.HasPrefix(keyStr, groupPrefix) {
+ manager := val.(*topNProcessorManager)
+ if err := manager.Close(); err != nil {
+ sr.l.Error().Err(err).Str("key",
keyStr).Msg("failed to close topN processor manager")
+ } else {
+ sr.topNProcessorMap.Delete(keyStr)
+ closedThisRound++
+ }
}
+ return true
+ })
+ if closedThisRound == 0 {
+ break
}
+ totalClosed += closedThisRound
+ // continue to ensure no late-created managers remain
}
-
- if len(keysToDelete) > 0 {
- sr.l.Info().Str("groupName", groupName).Int("count",
len(keysToDelete)).Msg("stopped topN processors for group")
+ if totalClosed > 0 {
+ sr.l.Info().Str("groupName", groupName).Int("count",
totalClosed).Msg("stopped topN processors for group")
}
}
diff --git a/banyand/measure/svc_standalone.go
b/banyand/measure/svc_standalone.go
index 9b03b4da..5d2570d7 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -304,11 +304,12 @@ func (s *standalone) GracefulStop() {
}
observability.MetricsCollector.Unregister("measure_cache")
- s.schemaRepo.Close()
- s.c.Close()
+ // Stop local pipeline first to prevent new metadata/write events
creating processors during shutdown
if s.localPipeline != nil {
s.localPipeline.GracefulStop()
}
+ s.schemaRepo.Close()
+ s.c.Close()
}
func (s *standalone) collectCacheMetrics() {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 29e5577d..361c3764 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -81,6 +81,10 @@ func (sr *schemaRepo) inFlow(stm *databasev1.Measure,
seriesID uint64, shardID u
func (sr *schemaRepo) getSteamingManager(source *commonv1.Metadata, pipeline
queue.Client) (manager *topNProcessorManager) {
key := getKey(source)
+ // avoid creating a new manager if the source group is closing
+ if sr.isGroupClosing(source.GetGroup()) {
+ return nil
+ }
sourceMeasure, ok := sr.loadMeasure(source)
if !ok {
m, _ := sr.topNProcessorMap.LoadOrStore(key,
&topNProcessorManager{
diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go
index 39fd3d8e..1f7925be 100644
--- a/banyand/stream/tag_filter.go
+++ b/banyand/stream/tag_filter.go
@@ -203,6 +203,28 @@ func (tfs *tagFamilyFilters) Range(tagName string,
rangeOpts index.RangeOpts) (b
return true, nil
}
+// Having checks if any of the provided tag values might exist in the bloom
filter.
+// It returns true if at least one value might be contained in any tag family
filter.
+func (tfs *tagFamilyFilters) Having(tagName string, tagValues []string) bool {
+ for _, tff := range tfs.tagFamilyFilters {
+ if tf, ok := (*tff)[tagName]; ok {
+ if tf.filter != nil {
+ for _, tagValue := range tagValues {
+ if
tf.filter.MightContain([]byte(tagValue)) {
+ return true // Return true as
soon as we find a potential match
+ }
+ }
+ // None of the values might exist in this tag
family filter
+ return false
+ }
+ // If no bloom filter, conservatively return true
+ return true
+ }
+ }
+ // If tag is not found in any tag family filter, return true
(conservative)
+ return true
+}
+
func generateTagFamilyFilters() *tagFamilyFilters {
v := tagFamilyFiltersPool.Get()
if v == nil {
diff --git a/banyand/stream/tag_filter_test.go
b/banyand/stream/tag_filter_test.go
index b381ed78..d059f0e3 100644
--- a/banyand/stream/tag_filter_test.go
+++ b/banyand/stream/tag_filter_test.go
@@ -122,6 +122,315 @@ func generateMetaAndFilter(tagCount int, itemsPerTag int)
([]byte, []byte) {
return metaBuf, filterBuf.Bytes()
}
+func TestTagFamilyFiltersHaving(t *testing.T) {
+ // Create a tag family filter with test data
+ bf := filter.NewBloomFilter(100)
+ bf.Add([]byte("service-1"))
+ bf.Add([]byte("service-2"))
+ bf.Add([]byte("service-3"))
+
+ // Create tag filter
+ tf := &tagFilter{
+ filter: bf,
+ min: []byte{},
+ max: []byte{},
+ }
+
+ // Create tag family filter map
+ tff := &tagFamilyFilter{
+ "service": tf,
+ }
+
+ // Create tag family filters
+ tfs := &tagFamilyFilters{
+ tagFamilyFilters: []*tagFamilyFilter{tff},
+ }
+
+ tests := []struct {
+ name string
+ tagName string
+ description string
+ tagValues []string
+ expectedResult bool
+ }{
+ {
+ name: "all values might exist",
+ tagName: "service",
+ tagValues: []string{"service-1", "service-2"},
+ expectedResult: true,
+ description: "should return true when all values
might exist in bloom filter",
+ },
+ {
+ name: "some values might exist",
+ tagName: "service",
+ tagValues: []string{"service-1",
"unknown-service"},
+ expectedResult: true,
+ description: "should return true when at least one
value might exist in bloom filter",
+ },
+ {
+ name: "no values might exist",
+ tagName: "service",
+ tagValues: []string{"unknown-1", "unknown-2"},
+ expectedResult: false,
+ description: "should return false when no values
might exist in bloom filter",
+ },
+ {
+ name: "single value exists",
+ tagName: "service",
+ tagValues: []string{"service-3"},
+ expectedResult: true,
+ description: "should return true when single value
might exist in bloom filter",
+ },
+ {
+ name: "single value does not exist",
+ tagName: "service",
+ tagValues: []string{"definitely-not-there"},
+ expectedResult: false,
+ description: "should return false when single value
doesn't exist in bloom filter",
+ },
+ {
+ name: "empty values list",
+ tagName: "service",
+ tagValues: []string{},
+ expectedResult: false,
+ description: "should return false when no values
provided",
+ },
+ {
+ name: "tag not found",
+ tagName: "non-existent-tag",
+ tagValues: []string{"service-1"},
+ expectedResult: true,
+ description: "should return true (conservative) when
tag is not found",
+ },
+ {
+ name: "early exit - first match",
+ tagName: "service",
+ tagValues: []string{"service-1", "unknown-1",
"unknown-2"},
+ expectedResult: true,
+ description: "should return true immediately when
first value matches",
+ },
+ {
+ name: "late match",
+ tagName: "service",
+ tagValues: []string{"unknown-1", "unknown-2",
"service-2"},
+ expectedResult: true,
+ description: "should return true when last value
matches",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := tfs.Having(tt.tagName, tt.tagValues)
+ assert.Equal(t, tt.expectedResult, result,
tt.description)
+ })
+ }
+}
+
+func TestTagFamilyFiltersHavingWithoutBloomFilter(t *testing.T) {
+ assert := assert.New(t)
+
+ // Create tag filter without bloom filter
+ tf := &tagFilter{
+ filter: nil, // No bloom filter
+ min: []byte{},
+ max: []byte{},
+ }
+
+ // Create tag family filter map
+ tff := &tagFamilyFilter{
+ "service": tf,
+ }
+
+ // Create tag family filters
+ tfs := &tagFamilyFilters{
+ tagFamilyFilters: []*tagFamilyFilter{tff},
+ }
+
+ // When no bloom filter is available, should always return true
(conservative)
+ result := tfs.Having("service", []string{"any-service",
"another-service"})
+ assert.True(result, "should return true when no bloom filter is
available (conservative approach)")
+
+ // Test with empty list too
+ result = tfs.Having("service", []string{})
+ assert.True(result, "should return true even with empty list when no
bloom filter (conservative approach)")
+}
+
+func TestTagFamilyFiltersHavingMultipleTagFamilies(t *testing.T) {
+ assert := assert.New(t)
+
+ // Create first tag family filter
+ bf1 := filter.NewBloomFilter(50)
+ bf1.Add([]byte("service-1"))
+ bf1.Add([]byte("service-2"))
+
+ tf1 := &tagFilter{
+ filter: bf1,
+ min: []byte{},
+ max: []byte{},
+ }
+
+ tff1 := &tagFamilyFilter{
+ "service": tf1,
+ }
+
+ // Create second tag family filter
+ bf2 := filter.NewBloomFilter(50)
+ bf2.Add([]byte("user-1"))
+ bf2.Add([]byte("user-2"))
+
+ tf2 := &tagFilter{
+ filter: bf2,
+ min: []byte{},
+ max: []byte{},
+ }
+
+ tff2 := &tagFamilyFilter{
+ "user": tf2,
+ }
+
+ // Create tag family filters with multiple families
+ tfs := &tagFamilyFilters{
+ tagFamilyFilters: []*tagFamilyFilter{tff1, tff2},
+ }
+
+ // Test service tag (should find in first family)
+ result := tfs.Having("service", []string{"service-1", "unknown"})
+ assert.True(result, "should find service-1 in first tag family")
+
+ // Test user tag (should find in second family)
+ result = tfs.Having("user", []string{"user-2", "unknown"})
+ assert.True(result, "should find user-2 in second tag family")
+
+ // Test non-existent values
+ result = tfs.Having("service", []string{"unknown-1", "unknown-2"})
+ assert.False(result, "should not find unknown values")
+}
+
+func TestTagFamilyFiltersHavingLargeList(t *testing.T) {
+ assert := assert.New(t)
+
+ // Create a bloom filter with one target value
+ bf := filter.NewBloomFilter(1000)
+ bf.Add([]byte("target-service"))
+
+ tf := &tagFilter{
+ filter: bf,
+ min: []byte{},
+ max: []byte{},
+ }
+
+ tff := &tagFamilyFilter{
+ "service": tf,
+ }
+
+ tfs := &tagFamilyFilters{
+ tagFamilyFilters: []*tagFamilyFilter{tff},
+ }
+
+ // Create a large list with the target at the end
+ largeList := make([]string, 1000)
+ for i := 0; i < 999; i++ {
+ largeList[i] = fmt.Sprintf("non-existent-service-%d", i)
+ }
+ largeList[999] = "target-service"
+
+ result := tfs.Having("service", largeList)
+ assert.True(result, "should handle large lists and find target value")
+
+ // Test with large list that has no matches
+ noMatchList := make([]string, 1000)
+ for i := 0; i < 1000; i++ {
+ noMatchList[i] = fmt.Sprintf("definitely-not-there-%d", i)
+ }
+
+ result = tfs.Having("service", noMatchList)
+ assert.False(result, "should return false for large list with no
matches")
+}
+
+func BenchmarkTagFamilyFiltersHaving(b *testing.B) {
+ // Setup bloom filter with test data
+ bf := filter.NewBloomFilter(1000)
+ for i := 0; i < 500; i++ {
+ bf.Add([]byte(fmt.Sprintf("service-%d", i)))
+ }
+
+ tf := &tagFilter{
+ filter: bf,
+ min: []byte{},
+ max: []byte{},
+ }
+
+ tff := &tagFamilyFilter{
+ "service": tf,
+ }
+
+ tfs := &tagFamilyFilters{
+ tagFamilyFilters: []*tagFamilyFilter{tff},
+ }
+
+ // Test different sizes of input lists
+ b.Run("small list (5 items)", func(b *testing.B) {
+ testValues := []string{"service-1", "service-2", "service-3",
"unknown-1", "unknown-2"}
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfs.Having("service", testValues)
+ }
+ })
+
+ b.Run("medium list (50 items)", func(b *testing.B) {
+ testValues := make([]string, 50)
+ for i := 0; i < 50; i++ {
+ if i < 25 {
+ testValues[i] = fmt.Sprintf("service-%d", i)
+ } else {
+ testValues[i] = fmt.Sprintf("unknown-%d", i)
+ }
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfs.Having("service", testValues)
+ }
+ })
+
+ b.Run("large list (500 items)", func(b *testing.B) {
+ testValues := make([]string, 500)
+ for i := 0; i < 500; i++ {
+ if i < 250 {
+ testValues[i] = fmt.Sprintf("service-%d", i)
+ } else {
+ testValues[i] = fmt.Sprintf("unknown-%d", i)
+ }
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfs.Having("service", testValues)
+ }
+ })
+
+ b.Run("early exit (match first)", func(b *testing.B) {
+ testValues := make([]string, 100)
+ testValues[0] = "service-1" // This will match
+ for i := 1; i < 100; i++ {
+ testValues[i] = fmt.Sprintf("unknown-%d", i)
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfs.Having("service", testValues)
+ }
+ })
+
+ b.Run("no matches", func(b *testing.B) {
+ testValues := make([]string, 100)
+ for i := 0; i < 100; i++ {
+ testValues[i] = fmt.Sprintf("definitely-not-there-%d",
i)
+ }
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ tfs.Having("service", testValues)
+ }
+ })
+}
+
func BenchmarkTagFamilyFiltersUnmarshal(b *testing.B) {
testCases := []struct {
tagFamilyCount int
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ad8673c7..6e77f821 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -941,4 +941,5 @@ type Filter interface {
type FilterOp interface {
Eq(tagName string, tagValue string) bool
Range(tagName string, rangeOpts RangeOpts) (bool, error)
+ Having(tagName string, tagValues []string) bool
}
diff --git a/pkg/query/logical/trace/index_filter.go
b/pkg/query/logical/trace/index_filter.go
index 6faa950d..d99a0253 100644
--- a/pkg/query/logical/trace/index_filter.go
+++ b/pkg/query/logical/trace/index_filter.go
@@ -71,7 +71,7 @@ func parseConditionToFilter(cond *modelv1.Condition, schema
logical.Schema,
case modelv1.Condition_BINARY_OP_MATCH:
return &traceMatchFilter{op: "match", tagName: cond.Name},
[][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_HAVING:
- return &traceFilter{op: "having", tagName: cond.Name},
[][]*modelv1.TagValue{entity}, nil
+ return &traceHavingFilter{op: "having", tagName: cond.Name,
expr: expr}, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_HAVING:
return &traceFilter{op: "not_having", tagName: cond.Name},
[][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_IN:
@@ -250,6 +250,34 @@ func (tmf *traceMatchFilter) String() string {
return tmf.op + ":" + tmf.tagName
}
+// traceHavingFilter implements index.Filter for HAVING operations in trace
queries.
+type traceHavingFilter struct {
+ expr logical.LiteralExpr
+ op string
+ tagName string
+}
+
+func (thf *traceHavingFilter) Execute(_ index.GetSearcher, _ common.SeriesID,
_ *index.RangeOpts) (posting.List, posting.List, error) {
+ panic("traceHavingFilter.Execute should not be invoked")
+}
+
+func (thf *traceHavingFilter) ShouldSkip(tagFilters index.FilterOp) (bool,
error) {
+ // Use the parsed expression to get the tag values and invoke
tagFilters.Having
+ if thf.expr != nil {
+ subExprs := thf.expr.SubExprs()
+ tagValues := make([]string, len(subExprs))
+ for i, subExpr := range subExprs {
+ tagValues[i] = subExpr.String()
+ }
+ return !tagFilters.Having(thf.tagName, tagValues), nil
+ }
+ return false, nil
+}
+
+func (thf *traceHavingFilter) String() string {
+ return thf.op + ":" + thf.tagName
+}
+
// extractTraceIDsFromCondition extracts trace IDs from equal and in
conditions.
func extractTraceIDsFromCondition(cond *modelv1.Condition) []string {
var traceIDs []string
diff --git a/pkg/query/logical/trace/index_filter_test.go
b/pkg/query/logical/trace/index_filter_test.go
new file mode 100644
index 00000000..b8f3a186
--- /dev/null
+++ b/pkg/query/logical/trace/index_filter_test.go
@@ -0,0 +1,281 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+type MockFilterOp struct {
+ eqResults map[string]map[string]bool
+ havingResults map[string]map[string]bool
+}
+
+func NewMockFilterOp() *MockFilterOp {
+ return &MockFilterOp{
+ eqResults: make(map[string]map[string]bool),
+ havingResults: make(map[string]map[string]bool),
+ }
+}
+
+func (m *MockFilterOp) Eq(tagName string, tagValue string) bool {
+ if tagResults, ok := m.eqResults[tagName]; ok {
+ if result, ok := tagResults[tagValue]; ok {
+ return result
+ }
+ }
+ return false
+}
+
+func (m *MockFilterOp) Range(_ string, _ index.RangeOpts) (bool, error) {
+ return false, nil
+}
+
+func (m *MockFilterOp) Having(tagName string, tagValues []string) bool {
+ if tagResults, ok := m.havingResults[tagName]; ok {
+ key := ""
+ for i, val := range tagValues {
+ if i > 0 {
+ key += ","
+ }
+ key += val
+ }
+ if result, ok := tagResults[key]; ok {
+ return result
+ }
+ }
+ return false
+}
+
+func (m *MockFilterOp) SetHavingResult(tagName string, tagValues []string,
result bool) {
+ if m.havingResults[tagName] == nil {
+ m.havingResults[tagName] = make(map[string]bool)
+ }
+ key := ""
+ for i, val := range tagValues {
+ if i > 0 {
+ key += ","
+ }
+ key += val
+ }
+ m.havingResults[tagName][key] = result
+}
+
+type MockLiteralExpr struct {
+ stringValue string
+ subExprs []logical.LiteralExpr
+ elements []string
+}
+
+func NewMockLiteralExpr(stringValue string) *MockLiteralExpr {
+ return &MockLiteralExpr{
+ stringValue: stringValue,
+ elements: []string{stringValue},
+ }
+}
+
+func NewMockLiteralExprWithSubExprs(subExprs []logical.LiteralExpr)
*MockLiteralExpr {
+ elements := make([]string, len(subExprs))
+ for i, expr := range subExprs {
+ elements[i] = expr.String()
+ }
+ return &MockLiteralExpr{
+ subExprs: subExprs,
+ elements: elements,
+ }
+}
+
+func (m *MockLiteralExpr) String() string {
+ return m.stringValue
+}
+
+func (m *MockLiteralExpr) Elements() []string {
+ return m.elements
+}
+
+func (m *MockLiteralExpr) Equal(other logical.Expr) bool {
+ if o, ok := other.(*MockLiteralExpr); ok {
+ return m.stringValue == o.stringValue
+ }
+ return false
+}
+
+func (m *MockLiteralExpr) SubExprs() []logical.LiteralExpr {
+ return m.subExprs
+}
+
+func (m *MockLiteralExpr) Bytes() [][]byte {
+ result := make([][]byte, len(m.elements))
+ for i, elem := range m.elements {
+ result[i] = []byte(elem)
+ }
+ return result
+}
+
+func (m *MockLiteralExpr) Field(_ index.FieldKey) index.Field {
+ return index.Field{}
+}
+
+func (m *MockLiteralExpr) RangeOpts(_ bool, _ bool, _ bool) index.RangeOpts {
+ return index.RangeOpts{}
+}
+
+func TestTraceHavingFilterShouldSkip(t *testing.T) {
+ tests := []struct {
+ name string
+ tagName string
+ description string
+ subExprStrings []string
+ havingResult bool
+ expectedSkip bool
+ }{
+ {
+ name: "having matches - should not skip",
+ tagName: "service",
+ subExprStrings: []string{"service-1", "service-2"},
+ havingResult: true,
+ expectedSkip: false,
+ description: "should not skip when Having returns
true",
+ },
+ {
+ name: "having doesn't match - should skip",
+ tagName: "service",
+ subExprStrings: []string{"service-1", "service-2"},
+ havingResult: false,
+ expectedSkip: true,
+ description: "should skip when Having returns false",
+ },
+ {
+ name: "single value matches - should not
skip",
+ tagName: "endpoint",
+ subExprStrings: []string{"/api/v1/users"},
+ havingResult: true,
+ expectedSkip: false,
+ description: "should not skip when single value
Having returns true",
+ },
+ {
+ name: "multiple values no match - should
skip",
+ tagName: "endpoint",
+ subExprStrings: []string{"/api/v1/unknown",
"/api/v2/unknown"},
+ havingResult: false,
+ expectedSkip: true,
+ description: "should skip when multiple values
Having returns false",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create mock sub-expressions
+ subExprs := make([]logical.LiteralExpr,
len(tt.subExprStrings))
+ for i, str := range tt.subExprStrings {
+ subExprs[i] = NewMockLiteralExpr(str)
+ }
+
+ // Create mock expression
+ mockExpr := NewMockLiteralExprWithSubExprs(subExprs)
+
+ // Create trace having filter
+ filter := &traceHavingFilter{
+ expr: mockExpr,
+ op: "having",
+ tagName: tt.tagName,
+ }
+
+ // Create mock FilterOp
+ mockFilterOp := NewMockFilterOp()
+ mockFilterOp.SetHavingResult(tt.tagName,
tt.subExprStrings, tt.havingResult)
+
+ // Test ShouldSkip
+ result, err := filter.ShouldSkip(mockFilterOp)
+
+ assert.NoError(t, err, tt.description)
+ assert.Equal(t, tt.expectedSkip, result, tt.description)
+ })
+ }
+}
+
+func TestTraceHavingFilterShouldSkipNilExpr(t *testing.T) {
+ filter := &traceHavingFilter{
+ expr: nil,
+ op: "having",
+ tagName: "service",
+ }
+
+ mockFilterOp := NewMockFilterOp()
+
+ result, err := filter.ShouldSkip(mockFilterOp)
+
+ assert.NoError(t, err)
+ assert.False(t, result, "should not skip when expr is nil")
+}
+
+func TestTraceHavingFilterExecutePanics(t *testing.T) {
+ filter := &traceHavingFilter{
+ expr: nil,
+ op: "having",
+ tagName: "service",
+ }
+
+ assert.Panics(t, func() {
+ filter.Execute(nil, 0, nil)
+ }, "Execute should panic")
+}
+
+func TestTraceHavingFilterString(t *testing.T) {
+ filter := &traceHavingFilter{
+ expr: nil,
+ op: "having",
+ tagName: "service",
+ }
+
+ result := filter.String()
+ assert.Equal(t, "having:service", result)
+}
+
+func TestTraceHavingFilterIntegration(t *testing.T) {
+ // Integration test to ensure the filter works with realistic data
+
+ // Create sub-expressions
+ subExpr1 := NewMockLiteralExpr("order-service")
+ subExpr2 := NewMockLiteralExpr("user-service")
+
+ // Create main expression
+ mockExpr :=
NewMockLiteralExprWithSubExprs([]logical.LiteralExpr{subExpr1, subExpr2})
+
+ // Create filter
+ filter := &traceHavingFilter{
+ expr: mockExpr,
+ op: "having",
+ tagName: "service_name",
+ }
+
+ // Create mock FilterOp that simulates bloom filter behavior
+ mockFilterOp := NewMockFilterOp()
+ mockFilterOp.SetHavingResult("service_name", []string{"order-service",
"user-service"}, true)
+
+ // Test
+ shouldSkip, err := filter.ShouldSkip(mockFilterOp)
+
+ assert.NoError(t, err)
+ assert.False(t, shouldSkip, "should not skip when services are found")
+}