This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new df00ff2c A patch to the block cache (#727)
df00ff2c is described below

commit df00ff2c565d6db7f11875cf754da3db4a3fa25f
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Aug 19 06:06:17 2025 +0700

    A patch to the block cache (#727)
    
    * Refactor cache implementation to use Sizable interface for memory 
management
    
    - Updated cache methods to accept and return Sizable objects, enhancing 
memory size tracking.
    - Enhanced blockMetadataArray with Size method to report its memory size 
accurately.
    
    * Add bypass cache implementation for zero cache size scenarios
    
    - Introduced a new bypassCache type that implements the Cache interface as 
a no-op cache.
    - Updated PreRun methods in dataSVC and standalone to use NewBypassCache 
when MaxCacheSize is set to zero, allowing for flexible cache management.
---
 banyand/internal/storage/cache.go      | 90 ++++++++++++++++++++++++++++------
 banyand/internal/storage/cache_test.go | 27 +++++++---
 banyand/internal/storage/segment.go    |  4 +-
 banyand/internal/storage/shard.go      |  4 +-
 banyand/internal/storage/tsdb.go       |  4 +-
 banyand/measure/block_metadata.go      | 26 ++++++++++
 banyand/measure/svc_data.go            |  6 ++-
 banyand/measure/svc_standalone.go      |  6 ++-
 8 files changed, 138 insertions(+), 29 deletions(-)

diff --git a/banyand/internal/storage/cache.go 
b/banyand/internal/storage/cache.go
index 5814b798..7a890d82 100644
--- a/banyand/internal/storage/cache.go
+++ b/banyand/internal/storage/cache.go
@@ -28,10 +28,15 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// Sizable represents an object that can report its memory size.
+type Sizable interface {
+       Size() uint64
+}
+
 // Cache encapsulates the cache operations.
 type Cache interface {
-       Get(key EntryKey) any
-       Put(key EntryKey, value any)
+       Get(key EntryKey) Sizable
+       Put(key EntryKey, value Sizable)
        Close()
        Requests() uint64
        Misses() uint64
@@ -56,8 +61,9 @@ func DefaultCacheConfig() CacheConfig {
 }
 
 type entry struct {
-       value      any
+       value      Sizable
        lastAccess uint64
+       size       uint64
 }
 
 // EntryKey is the key of an entry in the cache.
@@ -97,14 +103,14 @@ func (h entryIndexHeap) Swap(i, j int) {
        h[j].index = j
 }
 
-func (h *entryIndexHeap) Push(x interface{}) {
+func (h *entryIndexHeap) Push(x any) {
        n := len(*h)
        ei := x.(*entryIndex)
        ei.index = n
        *h = append(*h, ei)
 }
 
-func (h *entryIndexHeap) Pop() interface{} {
+func (h *entryIndexHeap) Pop() any {
        old := *h
        n := len(old)
        x := old[n-1]
@@ -121,6 +127,7 @@ type serviceCache struct {
        stopCh          chan struct{}
        requests        uint64
        misses          uint64
+       currentSize     uint64
        mu              sync.RWMutex
        wg              sync.WaitGroup
        maxCacheSize    uint64
@@ -146,6 +153,7 @@ func NewServiceCacheWithConfig(config CacheConfig) Cache {
                maxCacheSize:    uint64(config.MaxCacheSize),
                cleanupInterval: config.CleanupInterval,
                idleTimeout:     config.IdleTimeout,
+               currentSize:     0,
        }
        sc.wg.Add(1)
        sc.startCleaner()
@@ -159,6 +167,17 @@ func (sc *serviceCache) startCleaner() {
        }()
 }
 
+func (sc *serviceCache) removeEntry(key EntryKey) {
+       if entry, exists := sc.entry[key]; exists {
+               atomic.AddUint64(&sc.currentSize, ^(entry.size - 1))
+               delete(sc.entry, key)
+               if ei, exists := sc.entryIndex[key]; exists && ei.index >= 0 && 
ei.index < sc.entryIndexHeap.Len() {
+                       heap.Remove(sc.entryIndexHeap, ei.index)
+                       delete(sc.entryIndex, key)
+               }
+       }
+}
+
 func (sc *serviceCache) clean() {
        ticker := time.NewTicker(sc.cleanupInterval)
        defer ticker.Stop()
@@ -169,9 +188,7 @@ func (sc *serviceCache) clean() {
                        sc.mu.Lock()
                        for key, entry := range sc.entry {
                                if now-atomic.LoadUint64(&entry.lastAccess) > 
uint64(sc.idleTimeout.Nanoseconds()) {
-                                       delete(sc.entry, key)
-                                       heap.Remove(sc.entryIndexHeap, 
sc.entryIndex[key].index)
-                                       delete(sc.entryIndex, key)
+                                       sc.removeEntry(key)
                                }
                        }
                        sc.mu.Unlock()
@@ -189,7 +206,7 @@ func (sc *serviceCache) Close() {
        sc.entryIndexHeap = nil
 }
 
-func (sc *serviceCache) Get(key EntryKey) any {
+func (sc *serviceCache) Get(key EntryKey) Sizable {
        atomic.AddUint64(&sc.requests, 1)
 
        sc.mu.RLock()
@@ -213,20 +230,29 @@ func (sc *serviceCache) Get(key EntryKey) any {
        return nil
 }
 
-func (sc *serviceCache) Put(key EntryKey, value any) {
+func (sc *serviceCache) Put(key EntryKey, value Sizable) {
        sc.mu.Lock()
        defer sc.mu.Unlock()
 
-       for sc.size() > sc.maxCacheSize && sc.len() > 0 {
+       valueSize := value.Size()
+       entryOverhead := uint64(unsafe.Sizeof(entry{}) + 
unsafe.Sizeof(entryIndex{}) + unsafe.Sizeof(key))
+       totalSize := valueSize + entryOverhead
+
+       if existing, exists := sc.entry[key]; exists {
+               atomic.AddUint64(&sc.currentSize, ^(existing.size - 1))
+               sc.removeEntry(key)
+       }
+
+       for atomic.LoadUint64(&sc.currentSize)+totalSize > sc.maxCacheSize && 
sc.len() > 0 {
                ei := heap.Pop(sc.entryIndexHeap).(*entryIndex)
-               delete(sc.entry, ei.key)
-               delete(sc.entryIndex, ei.key)
+               sc.removeEntry(ei.key)
        }
 
        now := uint64(time.Now().UnixNano())
        e := &entry{
                value:      value,
                lastAccess: now,
+               size:       totalSize,
        }
        ei := &entryIndex{
                key:   key,
@@ -235,6 +261,7 @@ func (sc *serviceCache) Put(key EntryKey, value any) {
        sc.entry[key] = e
        sc.entryIndex[key] = ei
        heap.Push(sc.entryIndexHeap, ei)
+       atomic.AddUint64(&sc.currentSize, totalSize)
 }
 
 func (sc *serviceCache) Requests() uint64 {
@@ -262,5 +289,40 @@ func (sc *serviceCache) Size() uint64 {
 }
 
 func (sc *serviceCache) size() uint64 {
-       return uint64(unsafe.Sizeof(*sc))
+       return atomic.LoadUint64(&sc.currentSize)
+}
+
+var _ Cache = (*bypassCache)(nil)
+
+type bypassCache struct{}
+
+// NewBypassCache creates a no-op cache implementation.
+func NewBypassCache() Cache {
+       return &bypassCache{}
+}
+
+func (bc *bypassCache) Get(_ EntryKey) Sizable {
+       return nil
+}
+
+func (bc *bypassCache) Put(_ EntryKey, _ Sizable) {
+}
+
+func (bc *bypassCache) Close() {
+}
+
+func (bc *bypassCache) Requests() uint64 {
+       return 0
+}
+
+func (bc *bypassCache) Misses() uint64 {
+       return 0
+}
+
+func (bc *bypassCache) Entries() uint64 {
+       return 0
+}
+
+func (bc *bypassCache) Size() uint64 {
+       return 0
 }
diff --git a/banyand/internal/storage/cache_test.go 
b/banyand/internal/storage/cache_test.go
index 0bf4eb3d..ad0149da 100644
--- a/banyand/internal/storage/cache_test.go
+++ b/banyand/internal/storage/cache_test.go
@@ -29,6 +29,19 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// testSizableString is a test implementation of Sizable for string values.
+type testSizableString struct {
+       value string
+}
+
+func (s testSizableString) Size() uint64 {
+       return uint64(len(s.value)) + 16 // base struct overhead
+}
+
+func (s testSizableString) String() string {
+       return s.value
+}
+
 func TestCachePutAndGet(t *testing.T) {
        serviceCache := NewServiceCache()
 
@@ -39,7 +52,7 @@ func TestCachePutAndGet(t *testing.T) {
                segmentID: segmentID(0),
                shardID:   common.ShardID(0),
        }
-       value := "test-value"
+       value := testSizableString{value: "test-value"}
 
        serviceCache.Put(key, value)
        assert.Equal(t, uint64(1), serviceCache.Entries())
@@ -67,7 +80,7 @@ func TestCacheEvict(t *testing.T) {
        serviceCache.maxCacheSize = 50
 
        var expectedKey EntryKey
-       var expectedValue string
+       var expectedValue testSizableString
        for i := 0; i < 10; i++ {
                key := EntryKey{
                        group:     "test-group",
@@ -76,7 +89,7 @@ func TestCacheEvict(t *testing.T) {
                        segmentID: segmentID(i),
                        shardID:   common.ShardID(i),
                }
-               value := "test-value" + strconv.Itoa(i)
+               value := testSizableString{value: "test-value" + 
strconv.Itoa(i)}
                serviceCache.Put(key, value)
                if i == 9 {
                        expectedKey, expectedValue = key, value
@@ -104,7 +117,7 @@ func TestCacheClean(t *testing.T) {
                segmentID: segmentID(0),
                shardID:   common.ShardID(0),
        }
-       value := "test-value"
+       value := testSizableString{value: "test-value"}
 
        serviceCache.Put(key, value)
        assert.Equal(t, uint64(1), serviceCache.Entries())
@@ -123,7 +136,7 @@ func TestCacheClose(t *testing.T) {
                segmentID: segmentID(0),
                shardID:   common.ShardID(0),
        }
-       value := "test-value"
+       value := testSizableString{value: "test-value"}
 
        serviceCache.Put(key, value)
        assert.Equal(t, uint64(1), serviceCache.Entries())
@@ -140,7 +153,7 @@ func TestCacheConcurrency(t *testing.T) {
        const numOperations = 100
        var wg sync.WaitGroup
        var expectedKey EntryKey
-       var expectedValue string
+       var expectedValue testSizableString
        wg.Add(numGoroutines * 2)
        for i := 0; i < numGoroutines; i++ {
                go func(id int) {
@@ -154,7 +167,7 @@ func TestCacheConcurrency(t *testing.T) {
                                        segmentID: segmentID(num),
                                        shardID:   common.ShardID(num),
                                }
-                               value := "test-value" + strconv.Itoa(num)
+                               value := testSizableString{value: "test-value" 
+ strconv.Itoa(num)}
                                if id == 0 && j == 0 {
                                        expectedKey, expectedValue = key, value
                                }
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 1c4c1ce0..914eb7ed 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -53,12 +53,12 @@ type segmentCache struct {
        segmentID segmentID
 }
 
-func (sc *segmentCache) get(key EntryKey) any {
+func (sc *segmentCache) get(key EntryKey) Sizable {
        key.segmentID = sc.segmentID
        return sc.groupCache.get(key)
 }
 
-func (sc *segmentCache) put(key EntryKey, value any) {
+func (sc *segmentCache) put(key EntryKey, value Sizable) {
        key.segmentID = sc.segmentID
        sc.groupCache.put(key, value)
 }
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index 2f892280..6b97f302 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -52,12 +52,12 @@ func NewShardCache(group string, segmentID segmentID, 
shardID common.ShardID) Ca
        }
 }
 
-func (sc *shardCache) Get(key EntryKey) any {
+func (sc *shardCache) Get(key EntryKey) Sizable {
        key.shardID = sc.shardID
        return sc.segmentCache.get(key)
 }
 
-func (sc *shardCache) Put(key EntryKey, value any) {
+func (sc *shardCache) Put(key EntryKey, value Sizable) {
        key.shardID = sc.shardID
        sc.segmentCache.put(key, value)
 }
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 4332be83..474bb255 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -81,12 +81,12 @@ type groupCache struct {
        group string
 }
 
-func (gc *groupCache) get(key EntryKey) any {
+func (gc *groupCache) get(key EntryKey) Sizable {
        key.group = gc.group
        return gc.Get(key)
 }
 
-func (gc *groupCache) put(key EntryKey, value any) {
+func (gc *groupCache) put(key EntryKey, value Sizable) {
        key.group = gc.group
        gc.Put(key, value)
 }
diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index 0e3efe3c..b4796a63 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -21,6 +21,7 @@ import (
        "errors"
        "fmt"
        "sort"
+       "unsafe"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -192,6 +193,31 @@ type blockMetadataArray struct {
        arr []blockMetadata
 }
 
+func (bma *blockMetadataArray) Size() uint64 {
+       size := uint64(unsafe.Sizeof(*bma))
+       size += uint64(len(bma.arr)) * uint64(unsafe.Sizeof(blockMetadata{}))
+       for i := range bma.arr {
+               bm := &bma.arr[i]
+               if bm.tagFamilies != nil {
+                       size += uint64(len(bm.tagFamilies)) * 
uint64(unsafe.Sizeof(dataBlock{}))
+                       for name := range bm.tagFamilies {
+                               size += uint64(unsafe.Sizeof("")) + 
uint64(len(name))
+                       }
+               }
+               if bm.tagProjection != nil {
+                       size += uint64(len(bm.tagProjection)) * 
uint64(unsafe.Sizeof(model.TagProjection{}))
+                       for j := range bm.tagProjection {
+                               tp := &bm.tagProjection[j]
+                               size += uint64(unsafe.Sizeof("")) + 
uint64(len(tp.Family)) // Family string header + data
+                               for _, name := range tp.Names {
+                                       size += uint64(unsafe.Sizeof("")) + 
uint64(len(name)) // Each name string header + data
+                               }
+                       }
+               }
+       }
+       return size
+}
+
 type timestampsMetadata struct {
        dataBlock
        min               int64
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index fd2ec7aa..f91bae5d 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -156,7 +156,11 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
        if val == nil {
                return errors.New("node id is empty")
        }
-       s.c = storage.NewServiceCacheWithConfig(s.cc)
+       if s.cc.MaxCacheSize == 0 {
+               s.c = storage.NewBypassCache()
+       } else {
+               s.c = storage.NewServiceCacheWithConfig(s.cc)
+       }
        node := val.(common.Node)
        s.schemaRepo = newDataSchemaRepo(s.dataPath, s, node.Labels)
 
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index b6504730..33834bc2 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -161,7 +161,11 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if val == nil {
                return errors.New("node id is empty")
        }
-       s.c = storage.NewServiceCacheWithConfig(s.cc)
+       if s.cc.MaxCacheSize == 0 {
+               s.c = storage.NewBypassCache()
+       } else {
+               s.c = storage.NewServiceCacheWithConfig(s.cc)
+       }
        node := val.(common.Node)
        s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
 

Reply via email to