Copilot commented on code in PR #955:
URL: 
https://github.com/apache/skywalking-banyandb/pull/955#discussion_r2715180526


##########
CHANGES.md:
##########
@@ -18,6 +18,9 @@ Release Notes.
 - Update the dump tool to support analyzing the parts with smeta files.
 - Activate the property repair mechanism by default.
 - Add snapshot time retention policy to ensure the snapshot only can be 
deleted after the configured minimum age(time).
+- **Breaking Change**: Change the data storage path structure for property 
model:
+  - From: `<data-dir>/property/<shard>/...`
+  - To: `<data-dir>/property/group/<shard>/...`

Review Comment:
   The new on-disk layout implemented by the code is 
`<data-dir>/property/<group>/<shard>/...` (group name as a directory). The 
CHANGES entry currently says `<data-dir>/property/group/<shard>/...`, which 
reads like a literal `group` directory and is likely misleading—please update 
the text to match the actual layout (and consider including the `shard-<id>` 
naming).
   ```suggestion
     - From: `<data-dir>/property/shard-<id>/...`
     - To: `<data-dir>/property/<group>/shard-<id>/...`
   ```



##########
banyand/property/db.go:
##########
@@ -216,34 +248,87 @@ func (db *database) query(ctx context.Context, req 
*propertyv1.QueryRequest) ([]
        return result, nil
 }
 
-func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard, 
error) {
+func (db *database) collectGroupShards(groupsMap *map[string]*groupShards, 
requestedGroups map[string]bool) []*shard {
+       var shards []*shard
+       for groupName, gs := range *groupsMap {
+               if len(requestedGroups) > 0 {
+                       if _, ok := requestedGroups[groupName]; !ok {
+                               continue
+                       }
+               }
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       continue
+               }
+               shards = append(shards, *sLst...)
+       }
+       return shards
+}
+
+func (db *database) loadShard(ctx context.Context, group string, id 
common.ShardID) (*shard, error) {
        if db.closed.Load() {
                return nil, errors.New("database is closed")
        }
-       if s, ok := db.getShard(id); ok {
+       if s, ok := db.getShard(group, id); ok {
                return s, nil
        }
        db.mu.Lock()
        defer db.mu.Unlock()
-       if s, ok := db.getShard(id); ok {
+       if s, ok := db.getShard(group, id); ok {
                return s, nil
        }
-       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger), id, int64(db.flushInterval.Seconds()),
+
+       gs := db.getOrCreateGroupShards(group)
+       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger),
+               group, id, int64(db.flushInterval.Seconds()),
                int64(db.expireDelete.Seconds()), db.repairBaseDir, 
db.repairTreeSlotCount)
        if err != nil {
                return nil, err
        }
-       sLst := db.sLst.Load()
+
+       gs.mu.Lock()
+       sLst := gs.shards.Load()
        if sLst == nil {
                sLst = &[]*shard{}
        }
-       *sLst = append(*sLst, sd)
-       db.sLst.Store(sLst)
+       newList := append(*sLst, sd)
+       gs.shards.Store(&newList)

Review Comment:
   `newList := append(*sLst, sd)` can reuse the backing array of `*sLst` and 
mutate it in-place. Since readers access `gs.shards.Load()` without taking 
`gs.mu`, this can cause data races / inconsistent reads. Build the new slice 
with a guaranteed new backing array (e.g., copy to a fresh slice before 
appending) before storing it.



##########
banyand/property/repair.go:
##########
@@ -710,31 +686,30 @@ func (r *repairTreeComposer) append(id, shaVal string) 
(err error) {
        return file.append(idBytes, shaValBytes)
 }
 
-// composeNextGroupAndSave composes the current group of slot files into a 
repair tree file.
+// composeAndSave composes the slot files into a repair tree file.
 // tree file format: [leaf nodes]+[slot nodes]+[root node]+[metadata]
 // leaf nodes: each node contains: [entity]+[sha value]
 // slot nodes: each node contains: [slot index]+[sha value]+[leaf nodes start 
offset]+[leaf nodes count]
 // root node: contains [sha value]
 // metadata: contains footer([slot nodes start offset]+[slot nodes 
count]+[root node start offset]+[root node length])+[footer length(data binary)]
-func (r *repairTreeComposer) composeNextGroupAndSave(group string) (err error) 
{
-       treeFilePath := fmt.Sprintf(r.treeFileFmt, group)
-       treeBuilder, err := newRepairTreeBuilder(treeFilePath)
+func (r *repairTreeComposer) composeAndSave() (err error) {
+       treeBuilder, err := newRepairTreeBuilder(r.treeFilePath)
        if err != nil {
-               return fmt.Errorf("creating repair tree builder for group %s 
failure: %w", group, err)
+               return fmt.Errorf("creating repair tree builder failure: %w", 
err)
        }
        defer func() {
                if closeErr := treeBuilder.close(); closeErr != nil {
-                       err = multierr.Append(err, fmt.Errorf("closing repair 
tree builder for group %s failure: %w", group, closeErr))
+                       err = multierr.Append(err, fmt.Errorf("closing repair 
tree builder failure: %w", closeErr))
                }

Review Comment:
   With a single `state-tree.data` file being overwritten, make sure the tree 
file is truncated/rewritten atomically. `newRepairTreeBuilder` currently opens 
the file without `O_TRUNC` (and doesn’t call `Truncate(0)`), so rebuilding a 
smaller tree can leave stale bytes at the end and cause `treeReader()` to read 
the old footer. Fix by truncating/removing the file before writing (or opening 
with `os.O_TRUNC`).



##########
banyand/property/db.go:
##########
@@ -216,34 +248,87 @@ func (db *database) query(ctx context.Context, req 
*propertyv1.QueryRequest) ([]
        return result, nil
 }
 
-func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard, 
error) {
+func (db *database) collectGroupShards(groupsMap *map[string]*groupShards, 
requestedGroups map[string]bool) []*shard {
+       var shards []*shard
+       for groupName, gs := range *groupsMap {
+               if len(requestedGroups) > 0 {
+                       if _, ok := requestedGroups[groupName]; !ok {
+                               continue
+                       }
+               }
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       continue
+               }
+               shards = append(shards, *sLst...)
+       }
+       return shards
+}
+
+func (db *database) loadShard(ctx context.Context, group string, id 
common.ShardID) (*shard, error) {
        if db.closed.Load() {
                return nil, errors.New("database is closed")
        }
-       if s, ok := db.getShard(id); ok {
+       if s, ok := db.getShard(group, id); ok {
                return s, nil
        }
        db.mu.Lock()
        defer db.mu.Unlock()
-       if s, ok := db.getShard(id); ok {
+       if s, ok := db.getShard(group, id); ok {
                return s, nil
        }
-       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger), id, int64(db.flushInterval.Seconds()),
+
+       gs := db.getOrCreateGroupShards(group)
+       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger),
+               group, id, int64(db.flushInterval.Seconds()),
                int64(db.expireDelete.Seconds()), db.repairBaseDir, 
db.repairTreeSlotCount)
        if err != nil {
                return nil, err
        }
-       sLst := db.sLst.Load()
+
+       gs.mu.Lock()
+       sLst := gs.shards.Load()
        if sLst == nil {
                sLst = &[]*shard{}
        }
-       *sLst = append(*sLst, sd)
-       db.sLst.Store(sLst)
+       newList := append(*sLst, sd)
+       gs.shards.Store(&newList)
+       gs.mu.Unlock()
        return sd, nil
 }
 
-func (db *database) getShard(id common.ShardID) (*shard, bool) {
-       sLst := db.sLst.Load()
+func (db *database) getOrCreateGroupShards(group string) *groupShards {
+       groupsMap := db.groups.Load()
+       if groupsMap != nil {
+               if gs, ok := (*groupsMap)[group]; ok {
+                       return gs
+               }
+       }
+       gs := &groupShards{
+               group:    group,
+               location: filepath.Join(db.location, group),
+       }
+       if groupsMap == nil {
+               newMap := make(map[string]*groupShards)
+               newMap[group] = gs
+               db.groups.Store(&newMap)
+       } else {
+               (*groupsMap)[group] = gs
+               db.groups.Store(groupsMap)

Review Comment:
   `getOrCreateGroupShards` mutates the existing `map[string]*groupShards` 
in-place (`(*groupsMap)[group] = gs`) and then re-stores the same map pointer. 
Callers like `delete/query/collect/close` iterate over `*groupsMap` without 
holding `db.mu`, so this can trigger `concurrent map iteration and map write` 
panics. Use copy-on-write (allocate a new map, copy entries, add the new group, 
then `Store` the new pointer) or protect all reads with the same lock used for 
writes.
   ```suggestion
                // Use copy-on-write: create a new map, copy existing entries, 
then add the new group.
                oldMap := *groupsMap
                newMap := make(map[string]*groupShards, len(oldMap)+1)
                for k, v := range oldMap {
                        newMap[k] = v
                }
                newMap[group] = gs
                db.groups.Store(&newMap)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to