This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f59cce3 Add Having method for bloom filter (#786)
6f59cce3 is described below

commit 6f59cce399a6e317f3d473f60c54b963ac97dc2c
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 25 12:44:15 2025 +0800

    Add Having method for bloom filter (#786)
    
    * Add Having method to tagFilterOp and tagFamilyFilters for bloom filter 
checks
    
    - Implemented the Having method in tagFilterOp to check if any provided tag 
values might exist in the bloom filter, returning true if at least one value is 
found.
    - Added corresponding Having method in tagFamilyFilters to handle multiple 
tag family filters.
    - Included comprehensive unit tests for both methods to ensure correct 
functionality across various scenarios, including edge cases with empty lists 
and large input sizes.
    - Updated the index.Filter interface to include the Having method for 
consistency in filter operations.
    - Enhanced trace query handling with a new traceHavingFilter to support 
HAVING operations in trace queries.
    
    * Remove license dependency check from pre-push target in Makefile
    
    * Update banyand/stream/tag_filter.go
    
    * Enhance schemaRepo to manage closing groups
    
    - Introduced mechanisms to mark groups as closing, preventing new 
processors from being created during group deletion.
    - Added methods to check and unmark closing groups.
    - Updated the processor management logic to skip registration if a group is 
closing.
    - Ensured graceful shutdown of the local pipeline to avoid metadata events 
during service stop.
    
    ---------
    
    Signed-off-by: Gao Hongtao <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 Makefile                                     |   1 -
 banyand/internal/sidx/tag_filter_op.go       |  35 +++
 banyand/internal/sidx/tag_filter_op_test.go  | 326 +++++++++++++++++++++++++++
 banyand/measure/metadata.go                  | 103 ++++++---
 banyand/measure/svc_standalone.go            |   5 +-
 banyand/measure/topn.go                      |   4 +
 banyand/stream/tag_filter.go                 |  22 ++
 banyand/stream/tag_filter_test.go            | 309 +++++++++++++++++++++++++
 pkg/index/index.go                           |   1 +
 pkg/query/logical/trace/index_filter.go      |  30 ++-
 pkg/query/logical/trace/index_filter_test.go | 281 +++++++++++++++++++++++
 11 files changed, 1084 insertions(+), 33 deletions(-)

diff --git a/Makefile b/Makefile
index 9a962cae..16d9225b 100644
--- a/Makefile
+++ b/Makefile
@@ -126,7 +126,6 @@ pre-push: ## Check source files before pushing to the 
remote repo
        $(MAKE) check-req
        $(MAKE) generate
        $(MAKE) lint
-       $(MAKE) license-dep
        $(MAKE) check
        $(MAKE) vuln-check
 
diff --git a/banyand/internal/sidx/tag_filter_op.go 
b/banyand/internal/sidx/tag_filter_op.go
index 8ed186cc..620f8aff 100644
--- a/banyand/internal/sidx/tag_filter_op.go
+++ b/banyand/internal/sidx/tag_filter_op.go
@@ -73,6 +73,41 @@ func (tfo *tagFilterOp) Eq(tagName string, tagValue string) 
bool {
        return true
 }
 
+// Having checks if any of the provided tag values might exist in the bloom 
filter.
+// It returns true if at least one value might be contained in the bloom 
filter.
+func (tfo *tagFilterOp) Having(tagName string, tagValues []string) bool {
+       if tfo.blockMetadata == nil || tfo.part == nil {
+               return false
+       }
+
+       // Check if the tag exists in the block
+       tagBlock, exists := tfo.blockMetadata.tagsBlocks[tagName]
+       if !exists {
+               return false
+       }
+
+       // Get or create cached tag filter data
+       cache, err := tfo.getTagFilterCache(tagName, tagBlock)
+       if err != nil {
+               logger.Errorf("failed to get tag filter cache for %s: %v", 
tagName, err)
+               return true // Conservative approach - don't filter out
+       }
+
+       // Use bloom filter to check if any value might exist
+       if cache.bloomFilter != nil {
+               for _, tagValue := range tagValues {
+                       if cache.bloomFilter.MightContain([]byte(tagValue)) {
+                               return true // Return true as soon as we find a 
potential match
+                       }
+               }
+               // None of the values might exist in the bloom filter
+               return false
+       }
+
+       // If no bloom filter, conservatively return true
+       return true
+}
+
 // Range checks if a tag is within a specific range using min/max metadata.
 func (tfo *tagFilterOp) Range(tagName string, rangeOpts index.RangeOpts) 
(bool, error) {
        if tfo.blockMetadata == nil || tfo.part == nil {
diff --git a/banyand/internal/sidx/tag_filter_op_test.go 
b/banyand/internal/sidx/tag_filter_op_test.go
index 1f5eff87..611c900b 100644
--- a/banyand/internal/sidx/tag_filter_op_test.go
+++ b/banyand/internal/sidx/tag_filter_op_test.go
@@ -19,6 +19,7 @@ package sidx
 
 import (
        "bytes"
+       "fmt"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -491,6 +492,331 @@ func BenchmarkDecodeBloomFilterFromBytes(b *testing.B) {
        }
 }
 
+func BenchmarkTagFilterOpHaving(b *testing.B) {
+       // Setup bloom filter with test data
+       bf := filter.NewBloomFilter(1000)
+       for i := 0; i < 500; i++ {
+               bf.Add([]byte(fmt.Sprintf("service-%d", i)))
+       }
+
+       cache := &tagFilterCache{
+               bloomFilter: bf,
+               valueType:   pbv1.ValueTypeStr,
+       }
+
+       tfo := &tagFilterOp{
+               blockMetadata: &blockMetadata{
+                       tagsBlocks: map[string]dataBlock{
+                               "service": {offset: 0, size: 100},
+                       },
+               },
+               part: &part{},
+               tagCache: map[string]*tagFilterCache{
+                       "service": cache,
+               },
+       }
+
+       // Test different sizes of input lists
+       b.Run("small list (5 items)", func(b *testing.B) {
+               testValues := []string{"service-1", "service-2", "service-3", 
"unknown-1", "unknown-2"}
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfo.Having("service", testValues)
+               }
+       })
+
+       b.Run("medium list (50 items)", func(b *testing.B) {
+               testValues := make([]string, 50)
+               for i := 0; i < 50; i++ {
+                       if i < 25 {
+                               testValues[i] = fmt.Sprintf("service-%d", i)
+                       } else {
+                               testValues[i] = fmt.Sprintf("unknown-%d", i)
+                       }
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfo.Having("service", testValues)
+               }
+       })
+
+       b.Run("large list (500 items)", func(b *testing.B) {
+               testValues := make([]string, 500)
+               for i := 0; i < 500; i++ {
+                       if i < 250 {
+                               testValues[i] = fmt.Sprintf("service-%d", i)
+                       } else {
+                               testValues[i] = fmt.Sprintf("unknown-%d", i)
+                       }
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfo.Having("service", testValues)
+               }
+       })
+
+       b.Run("early exit (match first)", func(b *testing.B) {
+               testValues := make([]string, 100)
+               testValues[0] = "service-1" // This will match
+               for i := 1; i < 100; i++ {
+                       testValues[i] = fmt.Sprintf("unknown-%d", i)
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfo.Having("service", testValues)
+               }
+       })
+
+       b.Run("no matches", func(b *testing.B) {
+               testValues := make([]string, 100)
+               for i := 0; i < 100; i++ {
+                       testValues[i] = fmt.Sprintf("definitely-not-there-%d", 
i)
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfo.Having("service", testValues)
+               }
+       })
+}
+
+func TestTagFilterOpHaving(t *testing.T) {
+       tests := []struct {
+               blockMetadata  *blockMetadata
+               part           *part
+               name           string
+               tagName        string
+               description    string
+               tagValues      []string
+               expectedResult bool
+       }{
+               {
+                       name:           "nil block metadata",
+                       blockMetadata:  nil,
+                       part:           &part{},
+                       tagName:        "service",
+                       tagValues:      []string{"order-service", 
"user-service"},
+                       expectedResult: false,
+                       description:    "should return false when blockMetadata 
is nil",
+               },
+               {
+                       name:           "nil part",
+                       blockMetadata:  &blockMetadata{tagsBlocks: 
make(map[string]dataBlock)},
+                       part:           nil,
+                       tagName:        "service",
+                       tagValues:      []string{"order-service", 
"user-service"},
+                       expectedResult: false,
+                       description:    "should return false when part is nil",
+               },
+               {
+                       name: "tag not in block",
+                       blockMetadata: &blockMetadata{
+                               tagsBlocks: map[string]dataBlock{
+                                       "other_tag": {offset: 0, size: 100},
+                               },
+                       },
+                       part:           &part{},
+                       tagName:        "service",
+                       tagValues:      []string{"order-service", 
"user-service"},
+                       expectedResult: false,
+                       description:    "should return false when tag doesn't 
exist in block",
+               },
+               {
+                       name: "tag exists but no cache data",
+                       blockMetadata: &blockMetadata{
+                               tagsBlocks: map[string]dataBlock{
+                                       "service": {offset: 0, size: 100},
+                               },
+                       },
+                       part:           &part{},
+                       tagName:        "service",
+                       tagValues:      []string{"order-service", 
"user-service"},
+                       expectedResult: true,
+                       description:    "should return true (conservative) when 
tag exists but cache fails",
+               },
+               {
+                       name:           "empty tag values list",
+                       blockMetadata:  &blockMetadata{tagsBlocks: 
make(map[string]dataBlock)},
+                       part:           &part{},
+                       tagName:        "service",
+                       tagValues:      []string{},
+                       expectedResult: false,
+                       description:    "should return false when no tag values 
provided",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       tfo := &tagFilterOp{
+                               blockMetadata: tt.blockMetadata,
+                               part:          tt.part,
+                               tagCache:      make(map[string]*tagFilterCache),
+                       }
+
+                       result := tfo.Having(tt.tagName, tt.tagValues)
+                       assert.Equal(t, tt.expectedResult, result, 
tt.description)
+               })
+       }
+}
+
+func TestTagFilterOpHavingWithBloomFilter(t *testing.T) {
+       // Create a bloom filter and add some test values
+       bf := filter.NewBloomFilter(100)
+       bf.Add([]byte("order-service"))
+       bf.Add([]byte("payment-service"))
+       bf.Add([]byte("inventory-service"))
+
+       // Create cache with bloom filter
+       cache := &tagFilterCache{
+               bloomFilter: bf,
+               valueType:   pbv1.ValueTypeStr,
+       }
+
+       tfo := &tagFilterOp{
+               blockMetadata: &blockMetadata{
+                       tagsBlocks: map[string]dataBlock{
+                               "service": {offset: 0, size: 100},
+                       },
+               },
+               part: &part{},
+               tagCache: map[string]*tagFilterCache{
+                       "service": cache,
+               },
+       }
+
+       tests := []struct {
+               name           string
+               description    string
+               tagValues      []string
+               expectedResult bool
+       }{
+               {
+                       name:           "all values might exist",
+                       tagValues:      []string{"order-service", 
"payment-service"},
+                       expectedResult: true,
+                       description:    "should return true when all values 
might exist in bloom filter",
+               },
+               {
+                       name:           "some values might exist",
+                       tagValues:      []string{"order-service", 
"non-existent-service"},
+                       expectedResult: true,
+                       description:    "should return true when at least one 
value might exist in bloom filter",
+               },
+               {
+                       name:           "no values might exist",
+                       tagValues:      []string{"non-existent-service-1", 
"non-existent-service-2"},
+                       expectedResult: false,
+                       description:    "should return false when no values 
might exist in bloom filter",
+               },
+               {
+                       name:           "single value exists",
+                       tagValues:      []string{"inventory-service"},
+                       expectedResult: true,
+                       description:    "should return true when single value 
might exist in bloom filter",
+               },
+               {
+                       name:           "single value does not exist",
+                       tagValues:      []string{"definitely-not-there"},
+                       expectedResult: false,
+                       description:    "should return false when single value 
doesn't exist in bloom filter",
+               },
+               {
+                       name:           "empty values list",
+                       tagValues:      []string{},
+                       expectedResult: false,
+                       description:    "should return false when no values 
provided",
+               },
+               {
+                       name:           "mixed case with first match",
+                       tagValues:      []string{"order-service", "unknown-1", 
"unknown-2"},
+                       expectedResult: true,
+                       description:    "should return true immediately when 
first value matches",
+               },
+               {
+                       name:           "mixed case with last match",
+                       tagValues:      []string{"unknown-1", "unknown-2", 
"payment-service"},
+                       expectedResult: true,
+                       description:    "should return true when last value 
matches",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := tfo.Having("service", tt.tagValues)
+                       assert.Equal(t, tt.expectedResult, result, 
tt.description)
+               })
+       }
+}
+
+func TestTagFilterOpHavingWithoutBloomFilter(t *testing.T) {
+       // Create cache without bloom filter
+       cache := &tagFilterCache{
+               bloomFilter: nil,
+               valueType:   pbv1.ValueTypeStr,
+       }
+
+       tfo := &tagFilterOp{
+               blockMetadata: &blockMetadata{
+                       tagsBlocks: map[string]dataBlock{
+                               "service": {offset: 0, size: 100},
+                       },
+               },
+               part: &part{},
+               tagCache: map[string]*tagFilterCache{
+                       "service": cache,
+               },
+       }
+
+       // When no bloom filter is available, should always return true 
(conservative)
+       result := tfo.Having("service", []string{"any-service", 
"another-service"})
+       assert.True(t, result, "should return true when no bloom filter is 
available (conservative approach)")
+
+       // Test with empty list too
+       result = tfo.Having("service", []string{})
+       assert.True(t, result, "should return true even with empty list when no 
bloom filter (conservative approach)")
+}
+
+func TestTagFilterOpHavingLargeList(t *testing.T) {
+       // Create a bloom filter with some values
+       bf := filter.NewBloomFilter(1000)
+       bf.Add([]byte("target-service"))
+
+       cache := &tagFilterCache{
+               bloomFilter: bf,
+               valueType:   pbv1.ValueTypeStr,
+       }
+
+       tfo := &tagFilterOp{
+               blockMetadata: &blockMetadata{
+                       tagsBlocks: map[string]dataBlock{
+                               "service": {offset: 0, size: 100},
+                       },
+               },
+               part: &part{},
+               tagCache: map[string]*tagFilterCache{
+                       "service": cache,
+               },
+       }
+
+       // Create a large list with the target at the end
+       largeList := make([]string, 1000)
+       for i := 0; i < 999; i++ {
+               largeList[i] = fmt.Sprintf("non-existent-service-%d", i)
+       }
+       largeList[999] = "target-service"
+
+       result := tfo.Having("service", largeList)
+       assert.True(t, result, "should handle large lists and find target 
value")
+
+       // Test with large list that has no matches
+       noMatchList := make([]string, 1000)
+       for i := 0; i < 1000; i++ {
+               noMatchList[i] = fmt.Sprintf("definitely-not-there-%d", i)
+       }
+
+       result = tfo.Having("service", noMatchList)
+       assert.False(t, result, "should return false for large list with no 
matches")
+}
+
 func TestTagFilterOpErrorHandling(t *testing.T) {
        t.Run("invalid range bounds", func(t *testing.T) {
                cache := &tagFilterCache{
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 53475c5a..67a7028b 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -80,18 +80,21 @@ type schemaRepo struct {
        metadata         metadata.Repo
        pipeline         queue.Client
        l                *logger.Logger
+       closingGroups    map[string]struct{}
        topNProcessorMap sync.Map
        nodeID           string
        path             string
+       closingGroupsMu  sync.RWMutex
 }
 
 func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, 
nodeID string) *schemaRepo {
        sr := &schemaRepo{
-               path:     path,
-               l:        svc.l,
-               metadata: svc.metadata,
-               pipeline: svc.localPipeline,
-               nodeID:   nodeID,
+               path:          path,
+               l:             svc.l,
+               metadata:      svc.metadata,
+               pipeline:      svc.localPipeline,
+               nodeID:        nodeID,
+               closingGroups: make(map[string]struct{}),
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -105,10 +108,11 @@ func newSchemaRepo(path string, svc *standalone, 
nodeLabels map[string]string, n
 
 func newLiaisonSchemaRepo(path string, svc *liaison, measureDataNodeRegistry 
grpc.NodeRegistry, pipeline queue.Client) *schemaRepo {
        sr := &schemaRepo{
-               path:     path,
-               l:        svc.l,
-               metadata: svc.metadata,
-               pipeline: pipeline,
+               path:          path,
+               l:             svc.l,
+               metadata:      svc.metadata,
+               pipeline:      pipeline,
+               closingGroups: make(map[string]struct{}),
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -120,6 +124,31 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
measureDataNodeRegistry grp
        return sr
 }
 
+// markGroupClosing marks a group as closing to prevent new processors from 
being created.
+func (sr *schemaRepo) markGroupClosing(group string) {
+       sr.closingGroupsMu.Lock()
+       if sr.closingGroups == nil {
+               sr.closingGroups = make(map[string]struct{})
+       }
+       sr.closingGroups[group] = struct{}{}
+       sr.closingGroupsMu.Unlock()
+}
+
+// unmarkGroupClosing removes the closing mark of a group.
+func (sr *schemaRepo) unmarkGroupClosing(group string) {
+       sr.closingGroupsMu.Lock()
+       delete(sr.closingGroups, group)
+       sr.closingGroupsMu.Unlock()
+}
+
+// isGroupClosing returns true if the group is currently being closed.
+func (sr *schemaRepo) isGroupClosing(group string) bool {
+       sr.closingGroupsMu.RLock()
+       _, ok := sr.closingGroups[group]
+       sr.closingGroupsMu.RUnlock()
+       return ok
+}
+
 func (sr *schemaRepo) start() {
        sr.Watcher()
        sr.metadata.
@@ -218,11 +247,22 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata 
schema.Metadata) {
                        return
                }
                topNSchema := metadata.Spec.(*databasev1.TopNAggregation)
+               // Skip creating/registering processors if the group is closing
+               if sr.isGroupClosing(topNSchema.GetMetadata().GetGroup()) || 
(topNSchema.SourceMeasure != nil && 
sr.isGroupClosing(topNSchema.SourceMeasure.GetGroup())) {
+                       sr.l.Debug().Str("group", 
topNSchema.GetMetadata().GetGroup()).
+                               Str("topN", topNSchema.GetMetadata().GetName()).
+                               Msg("skip TopNAggregation registration: group 
is closing")
+                       return
+               }
                if err := validate.TopNAggregation(topNSchema); err != nil {
                        sr.l.Warn().Err(err).Msg("topNAggregation is ignored")
                        return
                }
                manager := sr.getSteamingManager(topNSchema.SourceMeasure, 
sr.pipeline)
+               if manager == nil {
+                       // group is closing; skip registering
+                       return
+               }
                manager.register(topNSchema)
        default:
        }
@@ -235,12 +275,16 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
                if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
                        return
                }
+               // Mark group as closing to prevent new processors from being 
created during deletion
+               sr.markGroupClosing(g.Metadata.Name)
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
                        Typ:      resourceSchema.EventDelete,
                        Kind:     resourceSchema.EventKindGroup,
                        Metadata: g,
                })
                sr.stopAllProcessorsWithGroupPrefix(g.Metadata.Name)
+               // Deletion completed; allow future re-creation
+               sr.unmarkGroupClosing(g.Metadata.Name)
        case schema.KindMeasure:
                m := metadata.Spec.(*databasev1.Measure)
                sr.SendMetadataEvent(resourceSchema.MetadataEvent{
@@ -356,30 +400,31 @@ func (sr *schemaRepo) createTopNResultMeasure(ctx 
context.Context, measureSchema
 }
 
 func (sr *schemaRepo) stopAllProcessorsWithGroupPrefix(groupName string) {
-       var keysToDelete []string
        groupPrefix := groupName + "/"
-
-       sr.topNProcessorMap.Range(func(key, _ any) bool {
-               keyStr := key.(string)
-               if strings.HasPrefix(keyStr, groupPrefix) {
-                       keysToDelete = append(keysToDelete, keyStr)
-               }
-               return true
-       })
-
-       for _, key := range keysToDelete {
-               if v, ok := sr.topNProcessorMap.Load(key); ok {
-                       manager := v.(*topNProcessorManager)
-                       if err := manager.Close(); err != nil {
-                               sr.l.Error().Err(err).Str("key", 
key).Msg("failed to close topN processor manager")
-                       } else {
-                               sr.topNProcessorMap.Delete(key)
+       totalClosed := 0
+       for {
+               closedThisRound := 0
+               sr.topNProcessorMap.Range(func(key, val any) bool {
+                       keyStr := key.(string)
+                       if strings.HasPrefix(keyStr, groupPrefix) {
+                               manager := val.(*topNProcessorManager)
+                               if err := manager.Close(); err != nil {
+                                       sr.l.Error().Err(err).Str("key", 
keyStr).Msg("failed to close topN processor manager")
+                               } else {
+                                       sr.topNProcessorMap.Delete(keyStr)
+                                       closedThisRound++
+                               }
                        }
+                       return true
+               })
+               if closedThisRound == 0 {
+                       break
                }
+               totalClosed += closedThisRound
+               // continue to ensure no late-created managers remain
        }
-
-       if len(keysToDelete) > 0 {
-               sr.l.Info().Str("groupName", groupName).Int("count", 
len(keysToDelete)).Msg("stopped topN processors for group")
+       if totalClosed > 0 {
+               sr.l.Info().Str("groupName", groupName).Int("count", 
totalClosed).Msg("stopped topN processors for group")
        }
 }
 
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index 9b03b4da..5d2570d7 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -304,11 +304,12 @@ func (s *standalone) GracefulStop() {
        }
 
        observability.MetricsCollector.Unregister("measure_cache")
-       s.schemaRepo.Close()
-       s.c.Close()
+       // Stop local pipeline first to prevent new metadata/write events 
creating processors during shutdown
        if s.localPipeline != nil {
                s.localPipeline.GracefulStop()
        }
+       s.schemaRepo.Close()
+       s.c.Close()
 }
 
 func (s *standalone) collectCacheMetrics() {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 29e5577d..361c3764 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -81,6 +81,10 @@ func (sr *schemaRepo) inFlow(stm *databasev1.Measure, 
seriesID uint64, shardID u
 
 func (sr *schemaRepo) getSteamingManager(source *commonv1.Metadata, pipeline 
queue.Client) (manager *topNProcessorManager) {
        key := getKey(source)
+       // avoid creating a new manager if the source group is closing
+       if sr.isGroupClosing(source.GetGroup()) {
+               return nil
+       }
        sourceMeasure, ok := sr.loadMeasure(source)
        if !ok {
                m, _ := sr.topNProcessorMap.LoadOrStore(key, 
&topNProcessorManager{
diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go
index 39fd3d8e..1f7925be 100644
--- a/banyand/stream/tag_filter.go
+++ b/banyand/stream/tag_filter.go
@@ -203,6 +203,28 @@ func (tfs *tagFamilyFilters) Range(tagName string, 
rangeOpts index.RangeOpts) (b
        return true, nil
 }
 
+// Having checks if any of the provided tag values might exist in the bloom 
filter.
+// It returns true if at least one value might be contained in any tag family 
filter.
+func (tfs *tagFamilyFilters) Having(tagName string, tagValues []string) bool {
+       for _, tff := range tfs.tagFamilyFilters {
+               if tf, ok := (*tff)[tagName]; ok {
+                       if tf.filter != nil {
+                               for _, tagValue := range tagValues {
+                                       if 
tf.filter.MightContain([]byte(tagValue)) {
+                                               return true // Return true as 
soon as we find a potential match
+                                       }
+                               }
+                               // None of the values might exist in this tag 
family filter
+                               return false
+                       }
+                       // If no bloom filter, conservatively return true
+                       return true
+               }
+       }
+       // If tag is not found in any tag family filter, return true 
(conservative)
+       return true
+}
+
 func generateTagFamilyFilters() *tagFamilyFilters {
        v := tagFamilyFiltersPool.Get()
        if v == nil {
diff --git a/banyand/stream/tag_filter_test.go 
b/banyand/stream/tag_filter_test.go
index b381ed78..d059f0e3 100644
--- a/banyand/stream/tag_filter_test.go
+++ b/banyand/stream/tag_filter_test.go
@@ -122,6 +122,315 @@ func generateMetaAndFilter(tagCount int, itemsPerTag int) 
([]byte, []byte) {
        return metaBuf, filterBuf.Bytes()
 }
 
+func TestTagFamilyFiltersHaving(t *testing.T) {
+       // Create a tag family filter with test data
+       bf := filter.NewBloomFilter(100)
+       bf.Add([]byte("service-1"))
+       bf.Add([]byte("service-2"))
+       bf.Add([]byte("service-3"))
+
+       // Create tag filter
+       tf := &tagFilter{
+               filter: bf,
+               min:    []byte{},
+               max:    []byte{},
+       }
+
+       // Create tag family filter map
+       tff := &tagFamilyFilter{
+               "service": tf,
+       }
+
+       // Create tag family filters
+       tfs := &tagFamilyFilters{
+               tagFamilyFilters: []*tagFamilyFilter{tff},
+       }
+
+       tests := []struct {
+               name           string
+               tagName        string
+               description    string
+               tagValues      []string
+               expectedResult bool
+       }{
+               {
+                       name:           "all values might exist",
+                       tagName:        "service",
+                       tagValues:      []string{"service-1", "service-2"},
+                       expectedResult: true,
+                       description:    "should return true when all values 
might exist in bloom filter",
+               },
+               {
+                       name:           "some values might exist",
+                       tagName:        "service",
+                       tagValues:      []string{"service-1", 
"unknown-service"},
+                       expectedResult: true,
+                       description:    "should return true when at least one 
value might exist in bloom filter",
+               },
+               {
+                       name:           "no values might exist",
+                       tagName:        "service",
+                       tagValues:      []string{"unknown-1", "unknown-2"},
+                       expectedResult: false,
+                       description:    "should return false when no values 
might exist in bloom filter",
+               },
+               {
+                       name:           "single value exists",
+                       tagName:        "service",
+                       tagValues:      []string{"service-3"},
+                       expectedResult: true,
+                       description:    "should return true when single value 
might exist in bloom filter",
+               },
+               {
+                       name:           "single value does not exist",
+                       tagName:        "service",
+                       tagValues:      []string{"definitely-not-there"},
+                       expectedResult: false,
+                       description:    "should return false when single value 
doesn't exist in bloom filter",
+               },
+               {
+                       name:           "empty values list",
+                       tagName:        "service",
+                       tagValues:      []string{},
+                       expectedResult: false,
+                       description:    "should return false when no values 
provided",
+               },
+               {
+                       name:           "tag not found",
+                       tagName:        "non-existent-tag",
+                       tagValues:      []string{"service-1"},
+                       expectedResult: true,
+                       description:    "should return true (conservative) when 
tag is not found",
+               },
+               {
+                       name:           "early exit - first match",
+                       tagName:        "service",
+                       tagValues:      []string{"service-1", "unknown-1", 
"unknown-2"},
+                       expectedResult: true,
+                       description:    "should return true immediately when 
first value matches",
+               },
+               {
+                       name:           "late match",
+                       tagName:        "service",
+                       tagValues:      []string{"unknown-1", "unknown-2", 
"service-2"},
+                       expectedResult: true,
+                       description:    "should return true when last value 
matches",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := tfs.Having(tt.tagName, tt.tagValues)
+                       assert.Equal(t, tt.expectedResult, result, 
tt.description)
+               })
+       }
+}
+
+func TestTagFamilyFiltersHavingWithoutBloomFilter(t *testing.T) {
+       assert := assert.New(t)
+
+       // Create tag filter without bloom filter
+       tf := &tagFilter{
+               filter: nil, // No bloom filter
+               min:    []byte{},
+               max:    []byte{},
+       }
+
+       // Create tag family filter map
+       tff := &tagFamilyFilter{
+               "service": tf,
+       }
+
+       // Create tag family filters
+       tfs := &tagFamilyFilters{
+               tagFamilyFilters: []*tagFamilyFilter{tff},
+       }
+
+       // When no bloom filter is available, should always return true 
(conservative)
+       result := tfs.Having("service", []string{"any-service", 
"another-service"})
+       assert.True(result, "should return true when no bloom filter is 
available (conservative approach)")
+
+       // Test with empty list too
+       result = tfs.Having("service", []string{})
+       assert.True(result, "should return true even with empty list when no 
bloom filter (conservative approach)")
+}
+
+func TestTagFamilyFiltersHavingMultipleTagFamilies(t *testing.T) {
+       assert := assert.New(t)
+
+       // Create first tag family filter
+       bf1 := filter.NewBloomFilter(50)
+       bf1.Add([]byte("service-1"))
+       bf1.Add([]byte("service-2"))
+
+       tf1 := &tagFilter{
+               filter: bf1,
+               min:    []byte{},
+               max:    []byte{},
+       }
+
+       tff1 := &tagFamilyFilter{
+               "service": tf1,
+       }
+
+       // Create second tag family filter
+       bf2 := filter.NewBloomFilter(50)
+       bf2.Add([]byte("user-1"))
+       bf2.Add([]byte("user-2"))
+
+       tf2 := &tagFilter{
+               filter: bf2,
+               min:    []byte{},
+               max:    []byte{},
+       }
+
+       tff2 := &tagFamilyFilter{
+               "user": tf2,
+       }
+
+       // Create tag family filters with multiple families
+       tfs := &tagFamilyFilters{
+               tagFamilyFilters: []*tagFamilyFilter{tff1, tff2},
+       }
+
+       // Test service tag (should find in first family)
+       result := tfs.Having("service", []string{"service-1", "unknown"})
+       assert.True(result, "should find service-1 in first tag family")
+
+       // Test user tag (should find in second family)
+       result = tfs.Having("user", []string{"user-2", "unknown"})
+       assert.True(result, "should find user-2 in second tag family")
+
+       // Test non-existent values
+       result = tfs.Having("service", []string{"unknown-1", "unknown-2"})
+       assert.False(result, "should not find unknown values")
+}
+
+func TestTagFamilyFiltersHavingLargeList(t *testing.T) {
+       assert := assert.New(t)
+
+       // Create a bloom filter with one target value
+       bf := filter.NewBloomFilter(1000)
+       bf.Add([]byte("target-service"))
+
+       tf := &tagFilter{
+               filter: bf,
+               min:    []byte{},
+               max:    []byte{},
+       }
+
+       tff := &tagFamilyFilter{
+               "service": tf,
+       }
+
+       tfs := &tagFamilyFilters{
+               tagFamilyFilters: []*tagFamilyFilter{tff},
+       }
+
+       // Create a large list with the target at the end
+       largeList := make([]string, 1000)
+       for i := 0; i < 999; i++ {
+               largeList[i] = fmt.Sprintf("non-existent-service-%d", i)
+       }
+       largeList[999] = "target-service"
+
+       result := tfs.Having("service", largeList)
+       assert.True(result, "should handle large lists and find target value")
+
+       // Test with large list that has no matches
+       noMatchList := make([]string, 1000)
+       for i := 0; i < 1000; i++ {
+               noMatchList[i] = fmt.Sprintf("definitely-not-there-%d", i)
+       }
+
+       result = tfs.Having("service", noMatchList)
+       assert.False(result, "should return false for large list with no 
matches")
+}
+
+func BenchmarkTagFamilyFiltersHaving(b *testing.B) {
+       // Setup bloom filter with test data
+       bf := filter.NewBloomFilter(1000)
+       for i := 0; i < 500; i++ {
+               bf.Add([]byte(fmt.Sprintf("service-%d", i)))
+       }
+
+       tf := &tagFilter{
+               filter: bf,
+               min:    []byte{},
+               max:    []byte{},
+       }
+
+       tff := &tagFamilyFilter{
+               "service": tf,
+       }
+
+       tfs := &tagFamilyFilters{
+               tagFamilyFilters: []*tagFamilyFilter{tff},
+       }
+
+       // Test different sizes of input lists
+       b.Run("small list (5 items)", func(b *testing.B) {
+               testValues := []string{"service-1", "service-2", "service-3", 
"unknown-1", "unknown-2"}
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfs.Having("service", testValues)
+               }
+       })
+
+       b.Run("medium list (50 items)", func(b *testing.B) {
+               testValues := make([]string, 50)
+               for i := 0; i < 50; i++ {
+                       if i < 25 {
+                               testValues[i] = fmt.Sprintf("service-%d", i)
+                       } else {
+                               testValues[i] = fmt.Sprintf("unknown-%d", i)
+                       }
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfs.Having("service", testValues)
+               }
+       })
+
+       b.Run("large list (500 items)", func(b *testing.B) {
+               testValues := make([]string, 500)
+               for i := 0; i < 500; i++ {
+                       if i < 250 {
+                               testValues[i] = fmt.Sprintf("service-%d", i)
+                       } else {
+                               testValues[i] = fmt.Sprintf("unknown-%d", i)
+                       }
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfs.Having("service", testValues)
+               }
+       })
+
+       b.Run("early exit (match first)", func(b *testing.B) {
+               testValues := make([]string, 100)
+               testValues[0] = "service-1" // This will match
+               for i := 1; i < 100; i++ {
+                       testValues[i] = fmt.Sprintf("unknown-%d", i)
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfs.Having("service", testValues)
+               }
+       })
+
+       b.Run("no matches", func(b *testing.B) {
+               testValues := make([]string, 100)
+               for i := 0; i < 100; i++ {
+                       testValues[i] = fmt.Sprintf("definitely-not-there-%d", 
i)
+               }
+               b.ResetTimer()
+               for i := 0; i < b.N; i++ {
+                       tfs.Having("service", testValues)
+               }
+       })
+}
+
 func BenchmarkTagFamilyFiltersUnmarshal(b *testing.B) {
        testCases := []struct {
                tagFamilyCount int
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ad8673c7..6e77f821 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -941,4 +941,5 @@ type Filter interface {
 type FilterOp interface {
        Eq(tagName string, tagValue string) bool
        Range(tagName string, rangeOpts RangeOpts) (bool, error)
+       Having(tagName string, tagValues []string) bool
 }
diff --git a/pkg/query/logical/trace/index_filter.go 
b/pkg/query/logical/trace/index_filter.go
index 6faa950d..d99a0253 100644
--- a/pkg/query/logical/trace/index_filter.go
+++ b/pkg/query/logical/trace/index_filter.go
@@ -71,7 +71,7 @@ func parseConditionToFilter(cond *modelv1.Condition, schema 
logical.Schema,
        case modelv1.Condition_BINARY_OP_MATCH:
                return &traceMatchFilter{op: "match", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_HAVING:
-               return &traceFilter{op: "having", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
+               return &traceHavingFilter{op: "having", tagName: cond.Name, 
expr: expr}, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                return &traceFilter{op: "not_having", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_IN:
@@ -250,6 +250,34 @@ func (tmf *traceMatchFilter) String() string {
        return tmf.op + ":" + tmf.tagName
 }
 
+// traceHavingFilter implements index.Filter for HAVING operations in trace 
queries.
+type traceHavingFilter struct {
+       expr    logical.LiteralExpr
+       op      string
+       tagName string
+}
+
+func (thf *traceHavingFilter) Execute(_ index.GetSearcher, _ common.SeriesID, 
_ *index.RangeOpts) (posting.List, posting.List, error) {
+       panic("traceHavingFilter.Execute should not be invoked")
+}
+
+func (thf *traceHavingFilter) ShouldSkip(tagFilters index.FilterOp) (bool, 
error) {
+       // Use the parsed expression to get the tag values and invoke 
tagFilters.Having
+       if thf.expr != nil {
+               subExprs := thf.expr.SubExprs()
+               tagValues := make([]string, len(subExprs))
+               for i, subExpr := range subExprs {
+                       tagValues[i] = subExpr.String()
+               }
+               return !tagFilters.Having(thf.tagName, tagValues), nil
+       }
+       return false, nil
+}
+
+func (thf *traceHavingFilter) String() string {
+       return thf.op + ":" + thf.tagName
+}
+
 // extractTraceIDsFromCondition extracts trace IDs from equal and in 
conditions.
 func extractTraceIDsFromCondition(cond *modelv1.Condition) []string {
        var traceIDs []string
diff --git a/pkg/query/logical/trace/index_filter_test.go 
b/pkg/query/logical/trace/index_filter_test.go
new file mode 100644
index 00000000..b8f3a186
--- /dev/null
+++ b/pkg/query/logical/trace/index_filter_test.go
@@ -0,0 +1,281 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+type MockFilterOp struct {
+       eqResults     map[string]map[string]bool
+       havingResults map[string]map[string]bool
+}
+
+func NewMockFilterOp() *MockFilterOp {
+       return &MockFilterOp{
+               eqResults:     make(map[string]map[string]bool),
+               havingResults: make(map[string]map[string]bool),
+       }
+}
+
+func (m *MockFilterOp) Eq(tagName string, tagValue string) bool {
+       if tagResults, ok := m.eqResults[tagName]; ok {
+               if result, ok := tagResults[tagValue]; ok {
+                       return result
+               }
+       }
+       return false
+}
+
+func (m *MockFilterOp) Range(_ string, _ index.RangeOpts) (bool, error) {
+       return false, nil
+}
+
+func (m *MockFilterOp) Having(tagName string, tagValues []string) bool {
+       if tagResults, ok := m.havingResults[tagName]; ok {
+               key := ""
+               for i, val := range tagValues {
+                       if i > 0 {
+                               key += ","
+                       }
+                       key += val
+               }
+               if result, ok := tagResults[key]; ok {
+                       return result
+               }
+       }
+       return false
+}
+
+func (m *MockFilterOp) SetHavingResult(tagName string, tagValues []string, 
result bool) {
+       if m.havingResults[tagName] == nil {
+               m.havingResults[tagName] = make(map[string]bool)
+       }
+       key := ""
+       for i, val := range tagValues {
+               if i > 0 {
+                       key += ","
+               }
+               key += val
+       }
+       m.havingResults[tagName][key] = result
+}
+
+type MockLiteralExpr struct {
+       stringValue string
+       subExprs    []logical.LiteralExpr
+       elements    []string
+}
+
+func NewMockLiteralExpr(stringValue string) *MockLiteralExpr {
+       return &MockLiteralExpr{
+               stringValue: stringValue,
+               elements:    []string{stringValue},
+       }
+}
+
+func NewMockLiteralExprWithSubExprs(subExprs []logical.LiteralExpr) 
*MockLiteralExpr {
+       elements := make([]string, len(subExprs))
+       for i, expr := range subExprs {
+               elements[i] = expr.String()
+       }
+       return &MockLiteralExpr{
+               subExprs: subExprs,
+               elements: elements,
+       }
+}
+
+func (m *MockLiteralExpr) String() string {
+       return m.stringValue
+}
+
+func (m *MockLiteralExpr) Elements() []string {
+       return m.elements
+}
+
+func (m *MockLiteralExpr) Equal(other logical.Expr) bool {
+       if o, ok := other.(*MockLiteralExpr); ok {
+               return m.stringValue == o.stringValue
+       }
+       return false
+}
+
+func (m *MockLiteralExpr) SubExprs() []logical.LiteralExpr {
+       return m.subExprs
+}
+
+func (m *MockLiteralExpr) Bytes() [][]byte {
+       result := make([][]byte, len(m.elements))
+       for i, elem := range m.elements {
+               result[i] = []byte(elem)
+       }
+       return result
+}
+
+func (m *MockLiteralExpr) Field(_ index.FieldKey) index.Field {
+       return index.Field{}
+}
+
+func (m *MockLiteralExpr) RangeOpts(_ bool, _ bool, _ bool) index.RangeOpts {
+       return index.RangeOpts{}
+}
+
+func TestTraceHavingFilterShouldSkip(t *testing.T) {
+       tests := []struct {
+               name           string
+               tagName        string
+               description    string
+               subExprStrings []string
+               havingResult   bool
+               expectedSkip   bool
+       }{
+               {
+                       name:           "having matches - should not skip",
+                       tagName:        "service",
+                       subExprStrings: []string{"service-1", "service-2"},
+                       havingResult:   true,
+                       expectedSkip:   false,
+                       description:    "should not skip when Having returns 
true",
+               },
+               {
+                       name:           "having doesn't match - should skip",
+                       tagName:        "service",
+                       subExprStrings: []string{"service-1", "service-2"},
+                       havingResult:   false,
+                       expectedSkip:   true,
+                       description:    "should skip when Having returns false",
+               },
+               {
+                       name:           "single value matches - should not 
skip",
+                       tagName:        "endpoint",
+                       subExprStrings: []string{"/api/v1/users"},
+                       havingResult:   true,
+                       expectedSkip:   false,
+                       description:    "should not skip when single value 
Having returns true",
+               },
+               {
+                       name:           "multiple values no match - should 
skip",
+                       tagName:        "endpoint",
+                       subExprStrings: []string{"/api/v1/unknown", 
"/api/v2/unknown"},
+                       havingResult:   false,
+                       expectedSkip:   true,
+                       description:    "should skip when multiple values 
Having returns false",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create mock sub-expressions
+                       subExprs := make([]logical.LiteralExpr, 
len(tt.subExprStrings))
+                       for i, str := range tt.subExprStrings {
+                               subExprs[i] = NewMockLiteralExpr(str)
+                       }
+
+                       // Create mock expression
+                       mockExpr := NewMockLiteralExprWithSubExprs(subExprs)
+
+                       // Create trace having filter
+                       filter := &traceHavingFilter{
+                               expr:    mockExpr,
+                               op:      "having",
+                               tagName: tt.tagName,
+                       }
+
+                       // Create mock FilterOp
+                       mockFilterOp := NewMockFilterOp()
+                       mockFilterOp.SetHavingResult(tt.tagName, 
tt.subExprStrings, tt.havingResult)
+
+                       // Test ShouldSkip
+                       result, err := filter.ShouldSkip(mockFilterOp)
+
+                       assert.NoError(t, err, tt.description)
+                       assert.Equal(t, tt.expectedSkip, result, tt.description)
+               })
+       }
+}
+
+func TestTraceHavingFilterShouldSkipNilExpr(t *testing.T) {
+       filter := &traceHavingFilter{
+               expr:    nil,
+               op:      "having",
+               tagName: "service",
+       }
+
+       mockFilterOp := NewMockFilterOp()
+
+       result, err := filter.ShouldSkip(mockFilterOp)
+
+       assert.NoError(t, err)
+       assert.False(t, result, "should not skip when expr is nil")
+}
+
+func TestTraceHavingFilterExecutePanics(t *testing.T) {
+       filter := &traceHavingFilter{
+               expr:    nil,
+               op:      "having",
+               tagName: "service",
+       }
+
+       assert.Panics(t, func() {
+               filter.Execute(nil, 0, nil)
+       }, "Execute should panic")
+}
+
+func TestTraceHavingFilterString(t *testing.T) {
+       filter := &traceHavingFilter{
+               expr:    nil,
+               op:      "having",
+               tagName: "service",
+       }
+
+       result := filter.String()
+       assert.Equal(t, "having:service", result)
+}
+
+func TestTraceHavingFilterIntegration(t *testing.T) {
+       // Integration test to ensure the filter works with realistic data
+
+       // Create sub-expressions
+       subExpr1 := NewMockLiteralExpr("order-service")
+       subExpr2 := NewMockLiteralExpr("user-service")
+
+       // Create main expression
+       mockExpr := 
NewMockLiteralExprWithSubExprs([]logical.LiteralExpr{subExpr1, subExpr2})
+
+       // Create filter
+       filter := &traceHavingFilter{
+               expr:    mockExpr,
+               op:      "having",
+               tagName: "service_name",
+       }
+
+       // Create mock FilterOp that simulates bloom filter behavior
+       mockFilterOp := NewMockFilterOp()
+       mockFilterOp.SetHavingResult("service_name", []string{"order-service", 
"user-service"}, true)
+
+       // Test
+       shouldSkip, err := filter.ShouldSkip(mockFilterOp)
+
+       assert.NoError(t, err)
+       assert.False(t, shouldSkip, "should not skip when services are found")
+}


Reply via email to