This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 5e585bcd Change the data storage path structure for property model
(#955)
5e585bcd is described below
commit 5e585bcdcbe8c01275f91dc64f7a52c03369aab4
Author: mrproliu <[email protected]>
AuthorDate: Thu Jan 22 14:45:24 2026 +0800
Change the data storage path structure for property model (#955)
* Change the data storage path structure for the property model
---------
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
CHANGES.md | 3 +
banyand/property/db.go | 164 ++++++++++++++++++++++--------
banyand/property/listener.go | 58 +++++++----
banyand/property/repair.go | 177 ++++++++++++++++-----------------
banyand/property/repair_gossip.go | 24 ++---
banyand/property/repair_gossip_test.go | 37 ++++---
banyand/property/repair_test.go | 51 ++++------
banyand/property/shard.go | 20 ++--
banyand/property/shard_test.go | 4 +-
banyand/property/test_helper.go | 2 +-
bydbctl/internal/cmd/property_test.go | 38 ++++---
docs/api-reference.md | 2 +
12 files changed, 340 insertions(+), 240 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index f7a9d528..8d8facb5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,9 @@ Release Notes.
- Add replication integration test for measure.
- Activate the property repair mechanism by default.
- Add snapshot time retention policy to ensure the snapshot only can be
deleted after the configured minimum age(time).
+- **Breaking Change**: Change the data storage path structure for property
model:
+ - From: `<data-dir>/property/data/shard-<id>/...`
+ - To: `<data-dir>/property/data/<group>/shard-<id>/...`
### Bug Fixes
diff --git a/banyand/property/db.go b/banyand/property/db.go
index 162a8ddc..105711b0 100644
--- a/banyand/property/db.go
+++ b/banyand/property/db.go
@@ -52,6 +52,13 @@ var (
propertyScope = observability.RootScope.SubScope("property")
)
+type groupShards struct {
+ shards atomic.Pointer[[]*shard]
+ group string
+ location string
+ mu sync.RWMutex
+}
+
type database struct {
metadata metadata.Repo
omr observability.MetricsRegistry
@@ -59,7 +66,7 @@ type database struct {
lock fs.File
logger *logger.Logger
repairScheduler *repairScheduler
- sLst atomic.Pointer[[]*shard]
+ groups sync.Map
location string
repairBaseDir string
flushInterval time.Duration
@@ -129,18 +136,29 @@ func (db *database) load(ctx context.Context) error {
if db.closed.Load() {
return errors.New("database is closed")
}
- return walkDir(db.location, "shard-", func(suffix string) error {
- id, err := strconv.Atoi(suffix)
- if err != nil {
- return err
+ for _, groupDir := range lfs.ReadDir(db.location) {
+ if !groupDir.IsDir() {
+ continue
}
- _, err = db.loadShard(ctx, common.ShardID(id))
- return err
- })
+ groupName := groupDir.Name()
+ groupPath := filepath.Join(db.location, groupName)
+ walkErr := walkDir(groupPath, "shard-", func(suffix string)
error {
+ id, parseErr := strconv.Atoi(suffix)
+ if parseErr != nil {
+ return parseErr
+ }
+ _, loadErr := db.loadShard(ctx, groupName,
common.ShardID(id))
+ return loadErr
+ })
+ if walkErr != nil {
+ return walkErr
+ }
+ }
+ return nil
}
func (db *database) update(ctx context.Context, shardID common.ShardID, id
[]byte, property *propertyv1.Property) error {
- sd, err := db.loadShard(ctx, shardID)
+ sd, err := db.loadShard(ctx, property.Metadata.Group, shardID)
if err != nil {
return err
}
@@ -152,14 +170,18 @@ func (db *database) update(ctx context.Context, shardID
common.ShardID, id []byt
}
func (db *database) delete(ctx context.Context, docIDs [][]byte) error {
- sLst := db.sLst.Load()
- if sLst == nil {
- return nil
- }
var err error
- for _, s := range *sLst {
- multierr.AppendInto(&err, s.delete(ctx, docIDs))
- }
+ db.groups.Range(func(_, value any) bool {
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
+ }
+ for _, s := range *sLst {
+ multierr.AppendInto(&err, s.delete(ctx, docIDs))
+ }
+ return true
+ })
return err
}
@@ -168,14 +190,18 @@ func (db *database) query(ctx context.Context, req
*propertyv1.QueryRequest) ([]
if err != nil {
return nil, err
}
- sLst := db.sLst.Load()
- if sLst == nil {
+ requestedGroups := make(map[string]bool, len(req.Groups))
+ for _, g := range req.Groups {
+ requestedGroups[g] = true
+ }
+ shards := db.collectGroupShards(requestedGroups)
+ if len(shards) == 0 {
return nil, nil
}
if req.OrderBy == nil {
var res []*queryProperty
- for _, s := range *sLst {
+ for _, s := range shards {
r, searchErr := s.search(ctx, iq, nil, int(req.Limit))
if searchErr != nil {
return nil, searchErr
@@ -185,8 +211,8 @@ func (db *database) query(ctx context.Context, req
*propertyv1.QueryRequest) ([]
return res, nil
}
- iters := make([]sort.Iterator[*queryProperty], 0, len(*sLst))
- for _, s := range *sLst {
+ iters := make([]sort.Iterator[*queryProperty], 0, len(shards))
+ for _, s := range shards {
// Each shard returns pre-sorted results (via SeriesSort)
r, searchErr := s.search(ctx, iq, req.OrderBy, int(req.Limit))
if searchErr != nil {
@@ -216,34 +242,77 @@ func (db *database) query(ctx context.Context, req
*propertyv1.QueryRequest) ([]
return result, nil
}
-func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard,
error) {
+func (db *database) collectGroupShards(requestedGroups map[string]bool)
[]*shard {
+ var shards []*shard
+ db.groups.Range(func(key, value any) bool {
+ groupName := key.(string)
+ if len(requestedGroups) > 0 {
+ if _, ok := requestedGroups[groupName]; !ok {
+ return true
+ }
+ }
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
+ }
+ shards = append(shards, *sLst...)
+ return true
+ })
+ return shards
+}
+
+func (db *database) loadShard(ctx context.Context, group string, id
common.ShardID) (*shard, error) {
if db.closed.Load() {
return nil, errors.New("database is closed")
}
- if s, ok := db.getShard(id); ok {
+ if s, ok := db.getShard(group, id); ok {
return s, nil
}
db.mu.Lock()
defer db.mu.Unlock()
- if s, ok := db.getShard(id); ok {
+ if s, ok := db.getShard(group, id); ok {
return s, nil
}
- sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey,
db.logger), id, int64(db.flushInterval.Seconds()),
+
+ gs := db.getOrCreateGroupShards(group)
+ sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey,
db.logger),
+ group, id, int64(db.flushInterval.Seconds()),
int64(db.expireDelete.Seconds()), db.repairBaseDir,
db.repairTreeSlotCount)
if err != nil {
return nil, err
}
- sLst := db.sLst.Load()
- if sLst == nil {
- sLst = &[]*shard{}
+
+ gs.mu.Lock()
+ sLst := gs.shards.Load()
+ var oldList []*shard
+ if sLst != nil {
+ oldList = *sLst
}
- *sLst = append(*sLst, sd)
- db.sLst.Store(sLst)
+ newList := make([]*shard, len(oldList)+1)
+ copy(newList, oldList)
+ newList[len(oldList)] = sd
+ gs.shards.Store(&newList)
+ gs.mu.Unlock()
return sd, nil
}
-func (db *database) getShard(id common.ShardID) (*shard, bool) {
- sLst := db.sLst.Load()
+func (db *database) getOrCreateGroupShards(group string) *groupShards {
+ gs := &groupShards{
+ group: group,
+ location: filepath.Join(db.location, group),
+ }
+ actual, _ := db.groups.LoadOrStore(group, gs)
+ return actual.(*groupShards)
+}
+
+func (db *database) getShard(group string, id common.ShardID) (*shard, bool) {
+ value, ok := db.groups.Load(group)
+ if !ok {
+ return nil, false
+ }
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
if sLst == nil {
return nil, false
}
@@ -262,13 +331,18 @@ func (db *database) close() error {
if db.repairScheduler != nil {
db.repairScheduler.close()
}
- sLst := db.sLst.Load()
var err error
- if sLst != nil {
+ db.groups.Range(func(_, value any) bool {
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
+ }
for _, s := range *sLst {
multierr.AppendInto(&err, s.close())
}
- }
+ return true
+ })
db.lock.Close()
return err
}
@@ -277,17 +351,21 @@ func (db *database) collect() {
if db.closed.Load() {
return
}
- sLst := db.sLst.Load()
- if sLst == nil {
- return
- }
- for _, s := range *sLst {
- s.store.CollectMetrics()
- }
+ db.groups.Range(func(_, value any) bool {
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
+ }
+ for _, s := range *sLst {
+ s.store.CollectMetrics()
+ }
+ return true
+ })
}
func (db *database) repair(ctx context.Context, id []byte, shardID uint64,
property *propertyv1.Property, deleteTime int64) error {
- s, err := db.loadShard(ctx, common.ShardID(shardID))
+ s, err := db.loadShard(ctx, property.Metadata.Group,
common.ShardID(shardID))
if err != nil {
return errors.WithMessagef(err, "failed to load shard %d", id)
}
diff --git a/banyand/property/listener.go b/banyand/property/listener.go
index bfb13d03..a7985ed2 100644
--- a/banyand/property/listener.go
+++ b/banyand/property/listener.go
@@ -234,30 +234,46 @@ func (s *snapshotListener) Rev(ctx context.Context,
message bus.Message) bus.Mes
defer s.snapshotMux.Unlock()
storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.minFileSnapshotAge, s.s.lfs)
sn := s.snapshotName()
- shardsRef := s.s.db.sLst.Load()
- if shardsRef == nil {
- return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
- }
- shards := *shardsRef
- for _, shard := range shards {
- select {
- case <-ctx.Done():
- return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
- default:
+ var snapshotResult *databasev1.Snapshot
+ s.s.db.groups.Range(func(_, value any) bool {
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
}
- snpDir := path.Join(s.s.snapshotDir, sn, storage.DataDir,
filepath.Base(shard.location))
- lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
- err := shard.store.TakeFileSnapshot(snpDir)
- if err != nil {
- s.s.l.Error().Err(err).Str("shard",
filepath.Base(shard.location)).Msg("fail to take shard snapshot")
- return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &databasev1.Snapshot{
- Name: sn,
- Catalog: commonv1.Catalog_CATALOG_PROPERTY,
- Error: err.Error(),
- })
+ for _, shardRef := range *sLst {
+ select {
+ case <-ctx.Done():
+ // Context canceled: record an error snapshot
and stop iteration.
+ if err := ctx.Err(); err != nil {
+ snapshotResult = &databasev1.Snapshot{
+ Name: sn,
+ Catalog:
commonv1.Catalog_CATALOG_PROPERTY,
+ Error: err.Error(),
+ }
+ }
+ return false
+ default:
+ }
+ snpDir := path.Join(s.s.snapshotDir, sn,
storage.DataDir, shardRef.group, filepath.Base(shardRef.location))
+ lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
+ snapshotErr := shardRef.store.TakeFileSnapshot(snpDir)
+ if snapshotErr != nil {
+ s.s.l.Error().Err(snapshotErr).Str("group",
shardRef.group).
+ Str("shard",
filepath.Base(shardRef.location)).Msg("fail to take shard snapshot")
+ snapshotResult = &databasev1.Snapshot{
+ Name: sn,
+ Catalog:
commonv1.Catalog_CATALOG_PROPERTY,
+ Error: snapshotErr.Error(),
+ }
+ return false
+ }
}
+ return true
+ })
+ if snapshotResult != nil {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
snapshotResult)
}
-
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
&databasev1.Snapshot{
Name: sn,
Catalog: commonv1.Catalog_CATALOG_PROPERTY,
diff --git a/banyand/property/repair.go b/banyand/property/repair.go
index 381ab1f4..b5d141b3 100644
--- a/banyand/property/repair.go
+++ b/banyand/property/repair.go
@@ -73,7 +73,7 @@ type repair struct {
snapshotDir string
statePath string
composeSlotAppendFilePath string
- composeTreeFilePathFmt string
+ composeTreeFilePath string
treeSlotCount int
batchSearchSize int
}
@@ -94,7 +94,7 @@ func newRepair(
snapshotDir: path.Join(repairPath, "snapshot"),
statePath: path.Join(repairPath, "state.json"),
composeSlotAppendFilePath: path.Join(repairPath,
"state-append-%d.tmp"),
- composeTreeFilePathFmt: path.Join(repairPath,
"state-tree-%s.data"),
+ composeTreeFilePath: path.Join(repairPath,
"state-tree.data"),
batchSearchSize: batchSearchSize,
metrics: newRepairMetrics(metricsFactory),
scheduler: scheduler,
@@ -124,8 +124,8 @@ func (r *repair) checkHasUpdates() (bool, error) {
return true, nil
}
-func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group
string) (err error) {
- r.l.Debug().Msgf("starting building status from snapshot path %s,
group: %s", snapshotPath, group)
+func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err
error) {
+ r.l.Debug().Msgf("starting building status from snapshot path %s",
snapshotPath)
startTime := time.Now()
defer func() {
r.metrics.totalBuildTreeFinished.Inc(1)
@@ -142,15 +142,10 @@ func (r *repair) buildStatus(ctx context.Context,
snapshotPath string, group str
sort.Sort(snapshotIDList(items))
blugeConf := bluge.DefaultConfig(snapshotPath)
- err = r.buildTree(ctx, blugeConf, group)
+ err = r.buildTree(ctx, blugeConf)
if err != nil {
return fmt.Errorf("building trees failure: %w", err)
}
- // if only update a specific group, the repair base status file doesn't
need to update
- // because not all the group have been processed
- if group != "" {
- return nil
- }
var latestSnapshotID uint64
if len(items) > 0 {
@@ -175,7 +170,7 @@ func (r *repair) buildStatus(ctx context.Context,
snapshotPath string, group str
return nil
}
-func (r *repair) buildTree(ctx context.Context, conf bluge.Config, group
string) error {
+func (r *repair) buildTree(ctx context.Context, conf bluge.Config) error {
reader, err := bluge.OpenReader(conf)
if err != nil {
// means no data found
@@ -188,9 +183,6 @@ func (r *repair) buildTree(ctx context.Context, conf
bluge.Config, group string)
_ = reader.Close()
}()
query := bluge.Query(bluge.NewMatchAllQuery())
- if group != "" {
- query = bluge.NewTermQuery(group).SetField(groupField)
- }
topNSearch := bluge.NewTopNSearch(r.batchSearchSize, query)
topNSearch.SortBy([]string{
fmt.Sprintf("+%s", groupField),
@@ -200,32 +192,19 @@ func (r *repair) buildTree(ctx context.Context, conf
bluge.Config, group string)
})
var latestProperty *searchingProperty
- treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath,
r.composeTreeFilePathFmt, r.treeSlotCount, r.l)
+ treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath,
r.composeTreeFilePath, r.treeSlotCount, r.l)
err = r.pageSearch(ctx, reader, topNSearch, func(sortValue [][]byte,
shaValue string) error {
if len(sortValue) != 4 {
return fmt.Errorf("unexpected sort value length: %d",
len(sortValue))
}
- group := convert.BytesToString(sortValue[0])
+ groupName := convert.BytesToString(sortValue[0])
name := convert.BytesToString(sortValue[1])
- entity := r.buildLeafNodeEntity(group, name,
convert.BytesToString(sortValue[2]))
-
- s := newSearchingProperty(group, shaValue, entity)
- if latestProperty != nil {
- if latestProperty.group != group {
- // if the group have changed, we need to append
the latest property to the tree composer, and compose builder
- // the entity is changed, need to save the
property
- if err =
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err !=
nil {
- return fmt.Errorf("appending property
to tree composer failure: %w", err)
- }
- err =
treeComposer.composeNextGroupAndSave(latestProperty.group)
- if err != nil {
- return fmt.Errorf("composing group
failure: %w", err)
- }
- } else if latestProperty.entityID != entity {
- // the entity is changed, need to save the
property
- if err =
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err !=
nil {
- return fmt.Errorf("appending property
to tree composer failure: %w", err)
- }
+ entity := r.buildLeafNodeEntity(groupName, name,
convert.BytesToString(sortValue[2]))
+
+ s := newSearchingProperty(groupName, shaValue, entity)
+ if latestProperty != nil && latestProperty.entityID != entity {
+ if appendErr :=
treeComposer.append(latestProperty.entityID, latestProperty.shaValue);
appendErr != nil {
+ return fmt.Errorf("appending property to tree
composer failure: %w", appendErr)
}
}
latestProperty = s
@@ -239,9 +218,8 @@ func (r *repair) buildTree(ctx context.Context, conf
bluge.Config, group string)
if err = treeComposer.append(latestProperty.entityID,
latestProperty.shaValue); err != nil {
return fmt.Errorf("appending latest property to tree
composer failure: %w", err)
}
- err = treeComposer.composeNextGroupAndSave(latestProperty.group)
- if err != nil {
- return fmt.Errorf("composing last group failure: %w",
err)
+ if err = treeComposer.composeAndSave(); err != nil {
+ return fmt.Errorf("composing tree failure: %w", err)
}
}
@@ -371,17 +349,15 @@ type repairTreeFileReader struct {
paging *repairTreeReaderPage
}
-func (r *repair) treeReader(group string) (repairTreeReader, error) {
+func (r *repair) treeReader() (repairTreeReader, error) {
r.scheduler.treeLocker.RLock()
defer r.scheduler.treeLocker.RUnlock()
- groupFile := fmt.Sprintf(r.composeTreeFilePathFmt, group)
- file, err := os.OpenFile(groupFile, os.O_RDONLY, os.ModePerm)
+ file, err := os.OpenFile(r.composeTreeFilePath, os.O_RDONLY,
os.ModePerm)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
- // if the file does not exist, it means no repair tree
for this group
return nil, nil
}
- return nil, fmt.Errorf("opening repair tree file %s failure:
%w", group, err)
+ return nil, fmt.Errorf("opening repair tree file failure: %w",
err)
}
reader := &repairTreeFileReader{
file: file,
@@ -389,7 +365,7 @@ func (r *repair) treeReader(group string)
(repairTreeReader, error) {
}
if err = reader.readFoot(); err != nil {
_ = file.Close()
- return nil, fmt.Errorf("reading footer from repair tree file %s
failure: %w", groupFile, err)
+ return nil, fmt.Errorf("reading footer from repair tree file
failure: %w", err)
}
return reader, nil
}
@@ -679,15 +655,15 @@ func (s snapshotIDList) Swap(i, j int) { s[i], s[j]
= s[j], s[i] }
type repairTreeComposer struct {
l *logger.Logger
appendFileFmt string
- treeFileFmt string
+ treeFilePath string
slotFiles []*repairSlotFile
slotCount int
}
-func newRepairTreeComposer(appendSlotFilePathFmt, treeFilePathFmt string,
slotCount int, l *logger.Logger) *repairTreeComposer {
+func newRepairTreeComposer(appendSlotFilePathFmt, treeFilePath string,
slotCount int, l *logger.Logger) *repairTreeComposer {
return &repairTreeComposer{
appendFileFmt: appendSlotFilePathFmt,
- treeFileFmt: treeFilePathFmt,
+ treeFilePath: treeFilePath,
slotCount: slotCount,
slotFiles: make([]*repairSlotFile, slotCount),
l: l,
@@ -710,31 +686,30 @@ func (r *repairTreeComposer) append(id, shaVal string)
(err error) {
return file.append(idBytes, shaValBytes)
}
-// composeNextGroupAndSave composes the current group of slot files into a
repair tree file.
+// composeAndSave composes the slot files into a repair tree file.
// tree file format: [leaf nodes]+[slot nodes]+[root node]+[metadata]
// leaf nodes: each node contains: [entity]+[sha value]
// slot nodes: each node contains: [slot index]+[sha value]+[leaf nodes start
offset]+[leaf nodes count]
// root node: contains [sha value]
// metadata: contains footer([slot nodes start offset]+[slot nodes
count]+[root node start offset]+[root node length])+[footer length(data binary)]
-func (r *repairTreeComposer) composeNextGroupAndSave(group string) (err error)
{
- treeFilePath := fmt.Sprintf(r.treeFileFmt, group)
- treeBuilder, err := newRepairTreeBuilder(treeFilePath)
+func (r *repairTreeComposer) composeAndSave() (err error) {
+ treeBuilder, err := newRepairTreeBuilder(r.treeFilePath)
if err != nil {
- return fmt.Errorf("creating repair tree builder for group %s
failure: %w", group, err)
+ return fmt.Errorf("creating repair tree builder failure: %w",
err)
}
defer func() {
if closeErr := treeBuilder.close(); closeErr != nil {
- err = multierr.Append(err, fmt.Errorf("closing repair
tree builder for group %s failure: %w", group, closeErr))
+ err = multierr.Append(err, fmt.Errorf("closing repair
tree builder failure: %w", closeErr))
}
}()
- for i, f := range r.slotFiles {
+ for idx, f := range r.slotFiles {
if f == nil {
continue
}
if err = treeBuilder.appendSlot(f); err != nil {
- return fmt.Errorf("appending slot file %s to repair
tree builder for group %s failure: %w", f.path, group, err)
+ return fmt.Errorf("appending slot file %s to repair
tree builder failure: %w", f.path, err)
}
- r.slotFiles[i] = nil
+ r.slotFiles[idx] = nil
}
return treeBuilder.build()
}
@@ -1132,19 +1107,27 @@ func (r *repairScheduler) doBuildTree() (err error) {
r.l.Err(saveStatusErr).Msgf("saving repair build tree
status failure")
}
}()
- sLst := r.db.sLst.Load()
- if sLst == nil {
- return nil
- }
hasUpdates := false
- for _, s := range *sLst {
- hasUpdates, err = s.repairState.checkHasUpdates()
- if err != nil {
- return err
+ var checkErr error
+ r.db.groups.Range(func(_, value any) bool {
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
}
- if hasUpdates {
- break
+ for _, s := range *sLst {
+ hasUpdates, checkErr = s.repairState.checkHasUpdates()
+ if checkErr != nil {
+ return false
+ }
+ if hasUpdates {
+ return false
+ }
}
+ return true
+ })
+ if checkErr != nil {
+ return checkErr
}
// if no updates, skip the repair
if !hasUpdates {
@@ -1166,41 +1149,53 @@ func (r *repairScheduler) buildingTree(shards
[]common.ShardID, group string, fo
}
defer r.treeLocker.Unlock()
- buildAll := len(shards) == 0
-
- // take a new snapshot first
snapshotPath, err := r.buildSnapshotFunc(r.closer.Ctx())
if err != nil {
return fmt.Errorf("taking snapshot failure: %w", err)
}
- return walkDir(snapshotPath, "shard-", func(suffix string) error {
- id, err := strconv.Atoi(suffix)
- if err != nil {
- return err
+
+ for _, groupDir := range lfs.ReadDir(snapshotPath) {
+ if !groupDir.IsDir() {
+ continue
}
- if !buildAll {
+ groupName := groupDir.Name()
+ if group != "" && groupName != group {
+ continue
+ }
+ groupPath := path.Join(snapshotPath, groupName)
+ walkErr := walkDir(groupPath, "shard-", func(suffix string)
error {
+ id, parseErr := strconv.Atoi(suffix)
+ if parseErr != nil {
+ return parseErr
+ }
// if not building all shards, check if the shard is in
the list
- found := false
- for _, s := range shards {
- if s == common.ShardID(id) {
- found = true
- break
+ if len(shards) > 0 {
+ found := false
+ for _, s := range shards {
+ if s == common.ShardID(id) {
+ found = true
+ break
+ }
+ }
+ if !found {
+ return nil // skip this shard
}
}
- if !found {
- return nil // skip this shard
+ s, loadErr := r.db.loadShard(r.closer.Ctx(), groupName,
common.ShardID(id))
+ if loadErr != nil {
+ return fmt.Errorf("loading shard %d failure:
%w", id, loadErr)
}
+ buildErr := s.repairState.buildStatus(r.closer.Ctx(),
path.Join(groupPath, fmt.Sprintf("shard-%s", suffix)))
+ if buildErr != nil {
+ return fmt.Errorf("building status for shard %d
failure: %w", id, buildErr)
+ }
+ return nil
+ })
+ if walkErr != nil {
+ return walkErr
}
- s, err := r.db.loadShard(r.closer.Ctx(), common.ShardID(id))
- if err != nil {
- return fmt.Errorf("loading shard %d failure: %w", id,
err)
- }
- err = s.repairState.buildStatus(r.closer.Ctx(),
path.Join(snapshotPath, fmt.Sprintf("shard-%s", suffix)), group)
- if err != nil {
- return fmt.Errorf("building status for shard %d
failure: %w", id, err)
- }
- return nil
- })
+ }
+ return nil
}
func (r *repairScheduler) documentUpdatesNotify() {
diff --git a/banyand/property/repair_gossip.go
b/banyand/property/repair_gossip.go
index e48d0f56..1f4af6a4 100644
--- a/banyand/property/repair_gossip.go
+++ b/banyand/property/repair_gossip.go
@@ -43,25 +43,25 @@ type repairGossipBase struct {
}
func (b *repairGossipBase) getTreeReader(ctx context.Context, group string,
shardID uint32) (repairTreeReader, bool, error) {
- s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+ s, err := b.scheduler.db.loadShard(ctx, group, common.ShardID(shardID))
if err != nil {
return nil, false, fmt.Errorf("failed to load shard %d: %w",
shardID, err)
}
- tree, err := s.repairState.treeReader(group)
+ tree, err := s.repairState.treeReader()
if err != nil {
return nil, false, fmt.Errorf("failed to get tree reader for
group %s: %w", group, err)
}
if tree == nil {
// if the tree is nil, but the state file exist, means the
tree(group) is empty
- stateExist, err := s.repairState.stateFileExist()
- if err != nil {
- return nil, false, fmt.Errorf("failed to check state
file existence for group %s: %w", group, err)
+ stateExist, stateErr := s.repairState.stateFileExist()
+ if stateErr != nil {
+ return nil, false, fmt.Errorf("failed to check state
file existence for group %s: %w", group, stateErr)
}
if !stateExist {
// check has scheduled or not
- stateExist, err = b.scheduler.checkHasBuildTree()
- if err != nil {
- return nil, false, fmt.Errorf("failed to check
if the tree state file exists: %w", err)
+ stateExist, stateErr = b.scheduler.checkHasBuildTree()
+ if stateErr != nil {
+ return nil, false, fmt.Errorf("failed to check
if the tree state file exists: %w", stateErr)
}
}
// if the tree is nil, it means the tree is no data
@@ -256,9 +256,9 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
return nil
}
- syncShard, err := r.scheduler.db.loadShard(ctx,
common.ShardID(request.ShardId))
- if err != nil {
- return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load
failure on client side: %v", request.ShardId, err)
+ syncShard, loadShardErr := r.scheduler.db.loadShard(ctx, request.Group,
common.ShardID(request.ShardId))
+ if loadShardErr != nil {
+ return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load
failure on client side: %v", request.ShardId, loadShardErr)
}
firstTreeSummaryResp := true
@@ -924,7 +924,7 @@ func (r *repairGossipServer) recvMsgAndWaitReadNextDiffer(
}
return fmt.Errorf("failed to receive missing or sync
request: %w", err)
}
- syncShard, err := r.scheduler.db.loadShard(s.Context(),
common.ShardID(shardID))
+ syncShard, err := r.scheduler.db.loadShard(s.Context(), group,
common.ShardID(shardID))
if err != nil {
return fmt.Errorf("shard %d load failure on server
side: %w", shardID, err)
}
diff --git a/banyand/property/repair_gossip_test.go
b/banyand/property/repair_gossip_test.go
index d9923a50..68b4db09 100644
--- a/banyand/property/repair_gossip_test.go
+++ b/banyand/property/repair_gossip_test.go
@@ -406,15 +406,22 @@ func startEachNode(ctrl *gomock.Controller, node node,
groups []group) *nodeCont
return "", newSpaceErr
}
result.appendStop(defFunc)
- sList := db.sLst.Load()
var snpError error
- for _, s := range *sList {
- snpDir := path.Join(snapshotDir,
filepath.Base(s.location))
- lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
- if e := s.store.TakeFileSnapshot(snpDir); e !=
nil {
- snpError = multierr.Append(snpError, e)
+ db.groups.Range(func(_, value any) bool {
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ return true
}
- }
+ for _, s := range *sLst {
+ snpDir := path.Join(snapshotDir,
s.group, filepath.Base(s.location))
+ lfs.MkdirPanicIfExist(snpDir,
storage.DirPerm)
+ if e :=
s.store.TakeFileSnapshot(snpDir); e != nil {
+ snpError =
multierr.Append(snpError, e)
+ }
+ }
+ return true
+ })
return snapshotDir, snpError
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
@@ -441,11 +448,13 @@ func startEachNode(ctrl *gomock.Controller, node node,
groups []group) *nodeCont
result.appendStop(messenger.GracefulStop)
- // initialize shard in to db
- for i := int32(0); i < node.shardCount; i++ {
- shardID := common.ShardID(i)
- _, err = db.loadShard(context.Background(), shardID)
- gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ // initialize shard in to db for each group
+ for _, g := range groups {
+ for i := int32(0); i < node.shardCount; i++ {
+ shardID := common.ShardID(i)
+ _, err = db.loadShard(context.Background(), g.name,
shardID)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
}
// adding data to the node
@@ -477,7 +486,7 @@ func (w *repairGossipClientWrapper) Rev(ctx
context.Context, t gossip.Trace, nex
}
func applyPropertyUpdate(db *database, p property) {
- s, err := db.loadShard(context.Background(), common.ShardID(p.shard))
+ s, err := db.loadShard(context.Background(), p.group,
common.ShardID(p.shard))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
update := &propertyv1.Property{
Metadata: &commonv1.Metadata{
@@ -496,7 +505,7 @@ func applyPropertyUpdate(db *database, p property) {
}
func queryPropertyWithVerify(db *database, p property) {
- s, err := db.loadShard(context.Background(), common.ShardID(p.shard))
+ s, err := db.loadShard(context.Background(), p.group,
common.ShardID(p.shard))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
query, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
diff --git a/banyand/property/repair_test.go b/banyand/property/repair_test.go
index 3926d31d..552f0af3 100644
--- a/banyand/property/repair_test.go
+++ b/banyand/property/repair_test.go
@@ -109,22 +109,6 @@ func TestBuildTree(t *testing.T) {
verifyContainsProperty(t, s, data,
defaultGroupName, propertyBuilder{id: "test3"})
},
},
- {
- name: "build with multiple groups",
- existingDoc: func(s *shard) ([]index.Document, error) {
- return buildPropertyDocuments(s,
- propertyBuilder{group: "group1", id:
"test1"},
- propertyBuilder{group: "group2", id:
"test2"},
- propertyBuilder{group: "group3", id:
"test3"},
- )
- },
- statusVerify: func(t *testing.T, s *shard, data
*repairData) {
- basicStatusVerify(t, data, "group1", 1,
"group2", 1, "group3", 1)
- verifyContainsProperty(t, s, data, "group1",
propertyBuilder{group: "group1", id: "test1"})
- verifyContainsProperty(t, s, data, "group2",
propertyBuilder{group: "group2", id: "test2"})
- verifyContainsProperty(t, s, data, "group3",
propertyBuilder{group: "group3", id: "test3"})
- },
- },
{
name: "build multiple times",
existingDoc: func(s *shard) ([]index.Document, error) {
@@ -211,7 +195,7 @@ func TestBuildTree(t *testing.T) {
_ = db.close()
})
- newShard, err := db.loadShard(context.Background(), 0)
+ newShard, err := db.loadShard(context.Background(),
defaultGroupName, 0)
if err != nil {
t.Fatal(err)
}
@@ -301,7 +285,7 @@ func TestDocumentUpdatesNotify(t *testing.T) {
_ = db.close()
})
- newShard, err := db.loadShard(context.Background(), 0)
+ newShard, err := db.loadShard(context.Background(), defaultGroupName, 0)
if err != nil {
t.Fatal(err)
}
@@ -314,7 +298,7 @@ func TestDocumentUpdatesNotify(t *testing.T) {
// wait for the repair tree to be built
gomega.Eventually(func() bool {
- tree, _ := newShard.repairState.treeReader(defaultGroupName)
+ tree, _ := newShard.repairState.treeReader()
if tree != nil {
_ = tree.close()
}
@@ -501,16 +485,17 @@ func newRepairData(repair *repair, status *repairStatus)
*repairData {
}
}
-func (r *repairData) readTree(t *testing.T, group string) *repairTestTree {
- if tree, exist := r.cache[group]; exist {
+func (r *repairData) readTree(t *testing.T, _ string) *repairTestTree {
+ const cacheKey = "tree"
+ if tree, exist := r.cache[cacheKey]; exist {
return tree
}
if r.readCacheOnly {
- t.Fatalf("readTree called for group %s, but cache only mode is
enabled", group)
+ t.Fatalf("readTree called, but cache only mode is enabled")
}
- reader, err := r.repair.treeReader(group)
+ reader, err := r.repair.treeReader()
if err != nil {
- t.Fatalf("failed to get tree reader for group %s: %v", group,
err)
+ t.Fatalf("failed to get tree reader: %v", err)
}
if reader == nil {
return nil
@@ -521,10 +506,10 @@ func (r *repairData) readTree(t *testing.T, group string)
*repairTestTree {
roots, err := reader.read(nil, 10, false)
if err != nil {
- t.Fatalf("failed to read tree for group %s: %v", group, err)
+ t.Fatalf("failed to read tree: %v", err)
}
if len(roots) == 0 {
- t.Fatalf("expected at least one root for group %s, but got
none", group)
+ t.Fatalf("expected at least one root, but got none")
}
tree := &repairTestTree{
root: &repairTestTreeNode{
@@ -534,10 +519,10 @@ func (r *repairData) readTree(t *testing.T, group string)
*repairTestTree {
}
slots, err := reader.read(roots[0], 10, false)
if err != nil {
- t.Fatalf("failed to read slots for group %s: %v", group, err)
+ t.Fatalf("failed to read slots: %v", err)
}
if len(slots) == 0 {
- t.Fatalf("expected at least one slot for group %s, but got
none", group)
+ t.Fatalf("expected at least one slot, but got none")
}
for _, slot := range slots {
slotNode := &repairTestTreeNode{
@@ -545,12 +530,12 @@ func (r *repairData) readTree(t *testing.T, group string)
*repairTestTree {
shaValue: slot.shaValue,
}
tree.root.children = append(tree.root.children, slotNode)
- children, err := reader.read(slot, 10, false)
- if err != nil {
- t.Fatalf("failed to read children for slot %d in group
%s: %v", slot.slotInx, group, err)
+ children, readErr := reader.read(slot, 10, false)
+ if readErr != nil {
+ t.Fatalf("failed to read children for slot %d: %v",
slot.slotInx, readErr)
}
if len(children) == 0 {
- t.Fatalf("expected at least one child for slot %d in
group %s, but got none", slot.slotInx, group)
+ t.Fatalf("expected at least one child for slot %d, but
got none", slot.slotInx)
}
for _, child := range children {
childNode := &repairTestTreeNode{
@@ -561,7 +546,7 @@ func (r *repairData) readTree(t *testing.T, group string)
*repairTestTree {
}
}
- r.cache[group] = tree
+ r.cache[cacheKey] = tree
return tree
}
diff --git a/banyand/property/shard.go b/banyand/property/shard.go
index 3568ffd8..93202940 100644
--- a/banyand/property/shard.go
+++ b/banyand/property/shard.go
@@ -70,12 +70,12 @@ var (
)
type shard struct {
- store index.SeriesStore
- l *logger.Logger
- repairState *repair
- location string
- id common.ShardID
-
+ store index.SeriesStore
+ l *logger.Logger
+ repairState *repair
+ location string
+ group string
+ id common.ShardID
expireToDeleteSec int64
}
@@ -88,21 +88,23 @@ func (s *shard) close() error {
func (db *database) newShard(
ctx context.Context,
+ group string,
id common.ShardID,
_ int64,
deleteExpireSec int64,
repairBaseDir string,
repairTreeSlotCount int,
) (*shard, error) {
- location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id)))
+ location := path.Join(db.location, group, fmt.Sprintf(shardTemplate,
int(id)))
sName := "shard" + strconv.Itoa(int(id))
si := &shard{
id: id,
+ group: group,
l: logger.Fetch(ctx, sName),
location: location,
expireToDeleteSec: deleteExpireSec,
}
- metricsFactory :=
db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))
+ metricsFactory :=
db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"group": group, "shard":
sName}))
opts := inverted.StoreOpts{
Path: location,
Logger: si.l,
@@ -114,7 +116,7 @@ func (db *database) newShard(
if si.store, err = inverted.NewStore(opts); err != nil {
return nil, err
}
- repairBaseDir = path.Join(repairBaseDir, sName)
+ repairBaseDir = path.Join(repairBaseDir, group, sName)
si.repairState = newRepair(location, repairBaseDir, logger.Fetch(ctx,
fmt.Sprintf("repair%d", id)),
metricsFactory, repairBatchSearchSize, repairTreeSlotCount,
db.repairScheduler)
return si, nil
diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go
index 1ec4f7ea..9aefb9ed 100644
--- a/banyand/property/shard_test.go
+++ b/banyand/property/shard_test.go
@@ -110,7 +110,7 @@ func TestMergeDeleted(t *testing.T) {
_ = db.close()
})
- newShard, err := db.loadShard(context.Background(), 0)
+ newShard, err := db.loadShard(context.Background(),
testPropertyGroup, 0)
if err != nil {
t.Fatal(err)
}
@@ -319,7 +319,7 @@ func TestRepair(t *testing.T) {
_ = db.close()
})
- newShard, err := db.loadShard(context.Background(), 0)
+ newShard, err := db.loadShard(context.Background(),
testPropertyGroup, 0)
if err != nil {
t.Fatal(err)
}
diff --git a/banyand/property/test_helper.go b/banyand/property/test_helper.go
index 677ab925..62ba0af1 100644
--- a/banyand/property/test_helper.go
+++ b/banyand/property/test_helper.go
@@ -51,7 +51,7 @@ func CreateTestShardForDump(tmpPath string, fileSystem
fs.FileSystem) (string, f
}
// Load shard 0
- shard, err := db.loadShard(context.Background(), 0)
+ shard, err := db.loadShard(context.Background(), "test-group", 0)
if err != nil {
db.close()
panic(err)
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
index 1459f1c0..ee383e22 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -1028,26 +1028,36 @@ func deleteData(rootCmd *cobra.Command, addr, group,
name, id string, success bo
}
func generateInvertedStore(rootPath string) (index.SeriesStore, error) {
- shardParent := path.Join(rootPath, "property", "data")
- list, err := os.ReadDir(shardParent)
+ dataParent := path.Join(rootPath, "property", "data")
+ groupList, err := os.ReadDir(dataParent)
if err != nil {
- return nil, fmt.Errorf("read dir %s error: %w", shardParent,
err)
+ return nil, fmt.Errorf("read dir %s error: %w", dataParent, err)
}
- if len(list) == 0 {
- return nil, fmt.Errorf("no shard found in %s", shardParent)
+ if len(groupList) == 0 {
+ return nil, fmt.Errorf("no group found in %s", dataParent)
}
- for _, e := range list {
- if !e.Type().IsDir() {
+ for _, groupEntry := range groupList {
+ if !groupEntry.Type().IsDir() {
continue
}
- _, found := strings.CutPrefix(e.Name(), "shard-")
- if !found {
+ groupPath := path.Join(dataParent, groupEntry.Name())
+ shardList, readErr := os.ReadDir(groupPath)
+ if readErr != nil {
continue
}
- return inverted.NewStore(
- inverted.StoreOpts{
- Path: path.Join(shardParent, e.Name()),
- })
+ for _, shardEntry := range shardList {
+ if !shardEntry.Type().IsDir() {
+ continue
+ }
+ _, found := strings.CutPrefix(shardEntry.Name(),
"shard-")
+ if !found {
+ continue
+ }
+ return inverted.NewStore(
+ inverted.StoreOpts{
+ Path: path.Join(groupPath,
shardEntry.Name()),
+ })
+ }
}
return nil, fmt.Errorf("no shard found in %s", rootPath)
}
@@ -1084,7 +1094,7 @@ func registerNodeToMessenger(m gossip.Messenger, nodeID,
gossipRepairAddr string
}
func getRepairTreeFilePath(nodeDir, group string) string {
- return path.Join(nodeDir, "property", "repairs", "shard0",
fmt.Sprintf("state-tree-%s.data", group))
+ return path.Join(nodeDir, "property", "repairs", group, "shard0",
"state-tree.data")
}
func getRepairTreeModTime(nodeDir, group string) (time.Time, error) {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 814da7f7..b42eb5ab 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -1111,6 +1111,8 @@ TopNList contains a series of topN items
| ----- | ---- | ----- | ----------- |
| entity | [banyandb.model.v1.Tag](#banyandb-model-v1-Tag) | repeated | |
| value | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | | |
+| version | [int64](#int64) | | |
+| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |