This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new df00ff2c A patch to the block cache (#727) df00ff2c is described below commit df00ff2c565d6db7f11875cf754da3db4a3fa25f Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Aug 19 06:06:17 2025 +0700 A patch to the block cache (#727) * Refactor cache implementation to use Sizable interface for memory management - Updated cache methods to accept and return Sizable objects, enhancing memory size tracking. - Enhanced blockMetadataArray with Size method to report its memory size accurately. * Add bypass cache implementation for zero cache size scenarios - Introduced a new bypassCache type that implements the Cache interface as a no-op cache. - Updated PreRun methods in dataSVC and standalone to use NewBypassCache when MaxCacheSize is set to zero, allowing for flexible cache management. --- banyand/internal/storage/cache.go | 90 ++++++++++++++++++++++++++++------ banyand/internal/storage/cache_test.go | 27 +++++++--- banyand/internal/storage/segment.go | 4 +- banyand/internal/storage/shard.go | 4 +- banyand/internal/storage/tsdb.go | 4 +- banyand/measure/block_metadata.go | 26 ++++++++++ banyand/measure/svc_data.go | 6 ++- banyand/measure/svc_standalone.go | 6 ++- 8 files changed, 138 insertions(+), 29 deletions(-) diff --git a/banyand/internal/storage/cache.go b/banyand/internal/storage/cache.go index 5814b798..7a890d82 100644 --- a/banyand/internal/storage/cache.go +++ b/banyand/internal/storage/cache.go @@ -28,10 +28,15 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) +// Sizable represents an object that can report its memory size. +type Sizable interface { + Size() uint64 +} + // Cache encapsulates the cache operations. type Cache interface { - Get(key EntryKey) any - Put(key EntryKey, value any) + Get(key EntryKey) Sizable + Put(key EntryKey, value Sizable) Close() Requests() uint64 Misses() uint64 @@ -56,8 +61,9 @@ func DefaultCacheConfig() CacheConfig { } type entry struct { - value any + value Sizable lastAccess uint64 + size uint64 } // EntryKey is the key of an entry in the cache. @@ -97,14 +103,14 @@ func (h entryIndexHeap) Swap(i, j int) { h[j].index = j } -func (h *entryIndexHeap) Push(x interface{}) { +func (h *entryIndexHeap) Push(x any) { n := len(*h) ei := x.(*entryIndex) ei.index = n *h = append(*h, ei) } -func (h *entryIndexHeap) Pop() interface{} { +func (h *entryIndexHeap) Pop() any { old := *h n := len(old) x := old[n-1] @@ -121,6 +127,7 @@ type serviceCache struct { stopCh chan struct{} requests uint64 misses uint64 + currentSize uint64 mu sync.RWMutex wg sync.WaitGroup maxCacheSize uint64 @@ -146,6 +153,7 @@ func NewServiceCacheWithConfig(config CacheConfig) Cache { maxCacheSize: uint64(config.MaxCacheSize), cleanupInterval: config.CleanupInterval, idleTimeout: config.IdleTimeout, + currentSize: 0, } sc.wg.Add(1) sc.startCleaner() @@ -159,6 +167,17 @@ func (sc *serviceCache) startCleaner() { }() } +func (sc *serviceCache) removeEntry(key EntryKey) { + if entry, exists := sc.entry[key]; exists { + atomic.AddUint64(&sc.currentSize, ^(entry.size - 1)) + delete(sc.entry, key) + if ei, exists := sc.entryIndex[key]; exists && ei.index >= 0 && ei.index < sc.entryIndexHeap.Len() { + heap.Remove(sc.entryIndexHeap, ei.index) + delete(sc.entryIndex, key) + } + } +} + func (sc *serviceCache) clean() { ticker := time.NewTicker(sc.cleanupInterval) defer ticker.Stop() @@ -169,9 +188,7 @@ func (sc *serviceCache) clean() { sc.mu.Lock() for key, entry := range sc.entry { if now-atomic.LoadUint64(&entry.lastAccess) > uint64(sc.idleTimeout.Nanoseconds()) { - delete(sc.entry, key) - heap.Remove(sc.entryIndexHeap, sc.entryIndex[key].index) - delete(sc.entryIndex, key) + sc.removeEntry(key) } } sc.mu.Unlock() @@ -189,7 +206,7 @@ func (sc *serviceCache) Close() { sc.entryIndexHeap = nil } -func (sc *serviceCache) Get(key EntryKey) any { +func (sc *serviceCache) Get(key EntryKey) Sizable { atomic.AddUint64(&sc.requests, 1) sc.mu.RLock() @@ -213,20 +230,29 @@ func (sc *serviceCache) Get(key EntryKey) any { return nil } -func (sc *serviceCache) Put(key EntryKey, value any) { +func (sc *serviceCache) Put(key EntryKey, value Sizable) { sc.mu.Lock() defer sc.mu.Unlock() - for sc.size() > sc.maxCacheSize && sc.len() > 0 { + valueSize := value.Size() + entryOverhead := uint64(unsafe.Sizeof(entry{}) + unsafe.Sizeof(entryIndex{}) + unsafe.Sizeof(key)) + totalSize := valueSize + entryOverhead + + if existing, exists := sc.entry[key]; exists { + atomic.AddUint64(&sc.currentSize, ^(existing.size - 1)) + sc.removeEntry(key) + } + + for atomic.LoadUint64(&sc.currentSize)+totalSize > sc.maxCacheSize && sc.len() > 0 { ei := heap.Pop(sc.entryIndexHeap).(*entryIndex) - delete(sc.entry, ei.key) - delete(sc.entryIndex, ei.key) + sc.removeEntry(ei.key) } now := uint64(time.Now().UnixNano()) e := &entry{ value: value, lastAccess: now, + size: totalSize, } ei := &entryIndex{ key: key, @@ -235,6 +261,7 @@ func (sc *serviceCache) Put(key EntryKey, value any) { sc.entry[key] = e sc.entryIndex[key] = ei heap.Push(sc.entryIndexHeap, ei) + atomic.AddUint64(&sc.currentSize, totalSize) } func (sc *serviceCache) Requests() uint64 { @@ -262,5 +289,40 @@ func (sc *serviceCache) Size() uint64 { } func (sc *serviceCache) size() uint64 { - return uint64(unsafe.Sizeof(*sc)) + return atomic.LoadUint64(&sc.currentSize) +} + +var _ Cache = (*bypassCache)(nil) + +type bypassCache struct{} + +// NewBypassCache creates a no-op cache implementation. +func NewBypassCache() Cache { + return &bypassCache{} +} + +func (bc *bypassCache) Get(_ EntryKey) Sizable { + return nil +} + +func (bc *bypassCache) Put(_ EntryKey, _ Sizable) { +} + +func (bc *bypassCache) Close() { +} + +func (bc *bypassCache) Requests() uint64 { + return 0 +} + +func (bc *bypassCache) Misses() uint64 { + return 0 +} + +func (bc *bypassCache) Entries() uint64 { + return 0 +} + +func (bc *bypassCache) Size() uint64 { + return 0 } diff --git a/banyand/internal/storage/cache_test.go b/banyand/internal/storage/cache_test.go index 0bf4eb3d..ad0149da 100644 --- a/banyand/internal/storage/cache_test.go +++ b/banyand/internal/storage/cache_test.go @@ -29,6 +29,19 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) +// testSizableString is a test implementation of Sizable for string values. +type testSizableString struct { + value string +} + +func (s testSizableString) Size() uint64 { + return uint64(len(s.value)) + 16 // base struct overhead +} + +func (s testSizableString) String() string { + return s.value +} + func TestCachePutAndGet(t *testing.T) { serviceCache := NewServiceCache() @@ -39,7 +52,7 @@ func TestCachePutAndGet(t *testing.T) { segmentID: segmentID(0), shardID: common.ShardID(0), } - value := "test-value" + value := testSizableString{value: "test-value"} serviceCache.Put(key, value) assert.Equal(t, uint64(1), serviceCache.Entries()) @@ -67,7 +80,7 @@ func TestCacheEvict(t *testing.T) { serviceCache.maxCacheSize = 50 var expectedKey EntryKey - var expectedValue string + var expectedValue testSizableString for i := 0; i < 10; i++ { key := EntryKey{ group: "test-group", @@ -76,7 +89,7 @@ func TestCacheEvict(t *testing.T) { segmentID: segmentID(i), shardID: common.ShardID(i), } - value := "test-value" + strconv.Itoa(i) + value := testSizableString{value: "test-value" + strconv.Itoa(i)} serviceCache.Put(key, value) if i == 9 { expectedKey, expectedValue = key, value @@ -104,7 +117,7 @@ func TestCacheClean(t *testing.T) { segmentID: segmentID(0), shardID: common.ShardID(0), } - value := "test-value" + value := testSizableString{value: "test-value"} serviceCache.Put(key, value) assert.Equal(t, uint64(1), serviceCache.Entries()) @@ -123,7 +136,7 @@ func TestCacheClose(t *testing.T) { segmentID: segmentID(0), shardID: common.ShardID(0), } - value := "test-value" + value := testSizableString{value: "test-value"} serviceCache.Put(key, value) assert.Equal(t, uint64(1), serviceCache.Entries()) @@ -140,7 +153,7 @@ func TestCacheConcurrency(t *testing.T) { const numOperations = 100 var wg sync.WaitGroup var expectedKey EntryKey - var expectedValue string + var expectedValue testSizableString wg.Add(numGoroutines * 2) for i := 0; i < numGoroutines; i++ { go func(id int) { @@ -154,7 +167,7 @@ func TestCacheConcurrency(t *testing.T) { segmentID: segmentID(num), shardID: common.ShardID(num), } - value := "test-value" + strconv.Itoa(num) + value := testSizableString{value: "test-value" + strconv.Itoa(num)} if id == 0 && j == 0 { expectedKey, expectedValue = key, value } diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 1c4c1ce0..914eb7ed 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -53,12 +53,12 @@ type segmentCache struct { segmentID segmentID } -func (sc *segmentCache) get(key EntryKey) any { +func (sc *segmentCache) get(key EntryKey) Sizable { key.segmentID = sc.segmentID return sc.groupCache.get(key) } -func (sc *segmentCache) put(key EntryKey, value any) { +func (sc *segmentCache) put(key EntryKey, value Sizable) { key.segmentID = sc.segmentID sc.groupCache.put(key, value) } diff --git a/banyand/internal/storage/shard.go b/banyand/internal/storage/shard.go index 2f892280..6b97f302 100644 --- a/banyand/internal/storage/shard.go +++ b/banyand/internal/storage/shard.go @@ -52,12 +52,12 @@ func NewShardCache(group string, segmentID segmentID, shardID common.ShardID) Ca } } -func (sc *shardCache) Get(key EntryKey) any { +func (sc *shardCache) Get(key EntryKey) Sizable { key.shardID = sc.shardID return sc.segmentCache.get(key) } -func (sc *shardCache) Put(key EntryKey, value any) { +func (sc *shardCache) Put(key EntryKey, value Sizable) { key.shardID = sc.shardID sc.segmentCache.put(key, value) } diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index 4332be83..474bb255 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -81,12 +81,12 @@ type groupCache struct { group string } -func (gc *groupCache) get(key EntryKey) any { +func (gc *groupCache) get(key EntryKey) Sizable { key.group = gc.group return gc.Get(key) } -func (gc *groupCache) put(key EntryKey, value any) { +func (gc *groupCache) put(key EntryKey, value Sizable) { key.group = gc.group gc.Put(key, value) } diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index 0e3efe3c..b4796a63 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sort" + "unsafe" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" @@ -192,6 +193,31 @@ type blockMetadataArray struct { arr []blockMetadata } +func (bma *blockMetadataArray) Size() uint64 { + size := uint64(unsafe.Sizeof(*bma)) + size += uint64(len(bma.arr)) * uint64(unsafe.Sizeof(blockMetadata{})) + for i := range bma.arr { + bm := &bma.arr[i] + if bm.tagFamilies != nil { + size += uint64(len(bm.tagFamilies)) * uint64(unsafe.Sizeof(dataBlock{})) + for name := range bm.tagFamilies { + size += uint64(unsafe.Sizeof("")) + uint64(len(name)) + } + } + if bm.tagProjection != nil { + size += uint64(len(bm.tagProjection)) * uint64(unsafe.Sizeof(model.TagProjection{})) + for j := range bm.tagProjection { + tp := &bm.tagProjection[j] + size += uint64(unsafe.Sizeof("")) + uint64(len(tp.Family)) // Family string header + data + for _, name := range tp.Names { + size += uint64(unsafe.Sizeof("")) + uint64(len(name)) // Each name string header + data + } + } + } + } + return size +} + type timestampsMetadata struct { dataBlock min int64 diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go index fd2ec7aa..f91bae5d 100644 --- a/banyand/measure/svc_data.go +++ b/banyand/measure/svc_data.go @@ -156,7 +156,11 @@ func (s *dataSVC) PreRun(ctx context.Context) error { if val == nil { return errors.New("node id is empty") } - s.c = storage.NewServiceCacheWithConfig(s.cc) + if s.cc.MaxCacheSize == 0 { + s.c = storage.NewBypassCache() + } else { + s.c = storage.NewServiceCacheWithConfig(s.cc) + } node := val.(common.Node) s.schemaRepo = newDataSchemaRepo(s.dataPath, s, node.Labels) diff --git a/banyand/measure/svc_standalone.go b/banyand/measure/svc_standalone.go index b6504730..33834bc2 100644 --- a/banyand/measure/svc_standalone.go +++ b/banyand/measure/svc_standalone.go @@ -161,7 +161,11 @@ func (s *standalone) PreRun(ctx context.Context) error { if val == nil { return errors.New("node id is empty") } - s.c = storage.NewServiceCacheWithConfig(s.cc) + if s.cc.MaxCacheSize == 0 { + s.c = storage.NewBypassCache() + } else { + s.c = storage.NewServiceCacheWithConfig(s.cc) + } node := val.(common.Node) s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)