hanahmily commented on code in PR #712:
URL: 
https://github.com/apache/skywalking-banyandb/pull/712#discussion_r2239365417


##########
api/proto/banyandb/property/v1/repair.proto:
##########
@@ -0,0 +1,97 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.property.v1;
+
+import "banyandb/property/v1/property.proto";
+
+option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1";
+option java_package = "org.apache.skywalking.banyandb.property.v1";
+
+message TreeSlotSHA {
+  int32 slot = 1;
+  string value = 2;
+}
+
+message TreeLeafNode {
+  // slot_index is the index of the slot in the tree.
+  int32 slot_index = 1;
+  // if the slot is empty, means the server side don't have the slot.
+  bool exists = 2;
+  // if the slot and entity exists, the SHA value of the entity.
+  string entity = 3;
+  string sha = 4;
+}
+
+message TreeSummary {
+  string group = 1;
+  uint32 shard_id = 2;
+  bool tree_found = 3;
+  string root_sha = 4;
+  repeated TreeSlotSHA slot_sha = 5;
+}
+
+message PropertyMissing {
+  string entity = 1;
+}
+
+message DifferTreeSummary {
+  bool tree_found = 1;
+  // if the nodes is empty, mean the server side don't have more tree leaf 
nodes to send.
+  repeated TreeLeafNode nodes = 2;
+}
+
+message PropertySync {
+  bytes id = 1;
+  banyandb.property.v1.Property property = 2;
+  int64 delete_time = 3;
+}
+
+message NoMorePropertySync {}
+
+message RepairRequest {
+  oneof data {
+    // compare stage
+    TreeSummary tree_summary = 11;

Review Comment:
   Why start at 11?



##########
banyand/property/repair.go:
##########
@@ -945,6 +1009,21 @@ func (r *repairScheduler) doRepair() (err error) {
        }
 
        // otherwise, we need to build the trees
+       return r.buildingTree(nil, "", false)
+}
+
+// nolint: contextcheck
+func (r *repairScheduler) buildingTree(shards []common.ShardID, group string, 
force bool) error {
+       if force {
+               r.buildTreeLocker.Lock()
+       } else if !r.buildTreeLocker.TryLock() {
+               // if not forced, we try to lock the build tree locker
+               return nil

Review Comment:
   Consider adding metrics for lock contention to monitor repair conflicts.



##########
banyand/property/repair_gossip.go:
##########
@@ -0,0 +1,696 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group 
string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, 
error) {
+       root, err := reader.read(nil, 1)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read tree root: %w", err)
+       }
+       if len(root) == 0 {
+               return nil, nil, fmt.Errorf("tree root is empty for group %s", 
group)
+       }
+       slots, err := reader.read(root[0], 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read slots for group %s: 
%w", group, err)
+       }
+       result := &propertyv1.TreeSummary{
+               Group:     group,
+               ShardId:   shardID,
+               TreeFound: true,
+               RootSha:   root[0].shaValue,
+               SlotSha:   make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+       }
+       slotsNodes := make(map[int32]*repairTreeNode, len(slots))
+       for _, s := range slots {
+               result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{
+                       Slot:  s.slotInx,
+                       Value: s.shaValue,
+               })
+               slotsNodes[s.slotInx] = s
+       }
+
+       return result, slotsNodes, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err := 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, 
request.Group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
rebuild tree for group %s, shard %d", request.Group, request.ShardId)
+                       }
+               }
+       }()
+       reader, found, err := r.getTreeReader(ctx, request.Group, 
request.ShardId)
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get 
tree reader on client side: %v", err)
+       }
+       if !found {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s, shard %d not found on client side", request.Group, request.ShardId)
+       }
+       defer reader.close()
+       summary, clientSlotNodes, err := r.buildTreeSummary(reader, 
request.Group, request.ShardId)
+       if err != nil {
+               // if the tree summary cannot be built, we should abort the 
propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to 
query tree summary on client side: %v", err)
+       }
+
+       stream, err := client.Repair(ctx)

Review Comment:
   Add stream closure: defer stream.CloseSend()



##########
banyand/property/service.go:
##########
@@ -87,6 +88,7 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.repairBuildTreeCron, 
"property-repair-build-tree-cron", "@every 1h", "the cron expression for 
repairing the build tree")
        flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"property-repair-quick-build-tree-time", time.Minute*10,
                "the duration of the quick build tree after operate the 
property")
+       flagS.StringVar(&s.repairTriggerCron, "property-repair-trigger-cron", 
"* 2 * * *", "the cron expression for background repairing the property data")

Review Comment:
   Can we disable the background repair through this flag?



##########
banyand/property/repair_gossip.go:
##########
@@ -0,0 +1,696 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group 
string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, 
error) {
+       root, err := reader.read(nil, 1)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read tree root: %w", err)
+       }
+       if len(root) == 0 {
+               return nil, nil, fmt.Errorf("tree root is empty for group %s", 
group)
+       }
+       slots, err := reader.read(root[0], 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read slots for group %s: 
%w", group, err)
+       }
+       result := &propertyv1.TreeSummary{
+               Group:     group,
+               ShardId:   shardID,
+               TreeFound: true,
+               RootSha:   root[0].shaValue,
+               SlotSha:   make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+       }
+       slotsNodes := make(map[int32]*repairTreeNode, len(slots))
+       for _, s := range slots {
+               result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{
+                       Slot:  s.slotInx,
+                       Value: s.shaValue,
+               })
+               slotsNodes[s.slotInx] = s
+       }
+
+       return result, slotsNodes, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err := 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, 
request.Group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
rebuild tree for group %s, shard %d", request.Group, request.ShardId)
+                       }
+               }
+       }()
+       reader, found, err := r.getTreeReader(ctx, request.Group, 
request.ShardId)
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get 
tree reader on client side: %v", err)
+       }
+       if !found {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s, shard %d not found on client side", request.Group, request.ShardId)
+       }
+       defer reader.close()
+       summary, clientSlotNodes, err := r.buildTreeSummary(reader, 
request.Group, request.ShardId)
+       if err != nil {
+               // if the tree summary cannot be built, we should abort the 
propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to 
query tree summary on client side: %v", err)
+       }
+
+       stream, err := client.Repair(ctx)
+       if err != nil {
+               return fmt.Errorf("failed to create repair stream: %w", err)
+       }
+
+       // step 1: send merkle tree summary
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_TreeSummary{
+                       TreeSummary: summary,
+               },
+       })
+       if err != nil {
+               return fmt.Errorf("failed to send tree summary: %w", err)
+       }
+
+       serverTreeSummaryResp, err := stream.Recv()
+       if err != nil {
+               return fmt.Errorf("failed to receive tree summary from server: 
%w", err)
+       }
+       treeSummaryResp, ok := 
serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary)
+       if !ok {
+               return fmt.Errorf("unexpected response type: %T, expected 
DifferTreeSummary", serverTreeSummaryResp.Data)
+       }
+       if !treeSummaryResp.DifferTreeSummary.TreeFound {
+               // if the tree is not found, we should abort the propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s not found on server side", request.Group)
+       }
+       if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 {
+               // there no different nodes, we can skip repair
+               return nil
+       }
+
+       // step 2: check with the server for different leaf nodes
+       syncShard, err := r.scheduler.db.loadShard(ctx, 
common.ShardID(request.ShardId))
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, err)
+       }
+       leafNodeCache := make(map[int32]map[string]*repairTreeNode)
+       differTreeSummary := treeSummaryResp.DifferTreeSummary
+keepLeafCompare:
+       r.handleDifferSummaryFromServer(ctx, stream, differTreeSummary, reader, 
syncShard, clientSlotNodes, leafNodeCache)
+keepReceiveServerMsg:

Review Comment:
   "goto" is not recommanded.



##########
banyand/property/repair_gossip.go:
##########
@@ -0,0 +1,696 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group 
string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, 
error) {
+       root, err := reader.read(nil, 1)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read tree root: %w", err)
+       }
+       if len(root) == 0 {
+               return nil, nil, fmt.Errorf("tree root is empty for group %s", 
group)
+       }
+       slots, err := reader.read(root[0], 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read slots for group %s: 
%w", group, err)
+       }
+       result := &propertyv1.TreeSummary{
+               Group:     group,
+               ShardId:   shardID,
+               TreeFound: true,
+               RootSha:   root[0].shaValue,
+               SlotSha:   make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+       }
+       slotsNodes := make(map[int32]*repairTreeNode, len(slots))
+       for _, s := range slots {
+               result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{
+                       Slot:  s.slotInx,
+                       Value: s.shaValue,
+               })
+               slotsNodes[s.slotInx] = s
+       }
+
+       return result, slotsNodes, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err := 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, 
request.Group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
rebuild tree for group %s, shard %d", request.Group, request.ShardId)
+                       }
+               }
+       }()
+       reader, found, err := r.getTreeReader(ctx, request.Group, 
request.ShardId)
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get 
tree reader on client side: %v", err)
+       }
+       if !found {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s, shard %d not found on client side", request.Group, request.ShardId)
+       }
+       defer reader.close()
+       summary, clientSlotNodes, err := r.buildTreeSummary(reader, 
request.Group, request.ShardId)
+       if err != nil {
+               // if the tree summary cannot be built, we should abort the 
propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to 
query tree summary on client side: %v", err)
+       }
+
+       stream, err := client.Repair(ctx)
+       if err != nil {
+               return fmt.Errorf("failed to create repair stream: %w", err)
+       }
+
+       // step 1: send merkle tree summary
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_TreeSummary{
+                       TreeSummary: summary,
+               },
+       })
+       if err != nil {
+               return fmt.Errorf("failed to send tree summary: %w", err)
+       }
+
+       serverTreeSummaryResp, err := stream.Recv()
+       if err != nil {
+               return fmt.Errorf("failed to receive tree summary from server: 
%w", err)
+       }
+       treeSummaryResp, ok := 
serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary)
+       if !ok {
+               return fmt.Errorf("unexpected response type: %T, expected 
DifferTreeSummary", serverTreeSummaryResp.Data)
+       }
+       if !treeSummaryResp.DifferTreeSummary.TreeFound {
+               // if the tree is not found, we should abort the propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s not found on server side", request.Group)
+       }
+       if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 {
+               // there no different nodes, we can skip repair
+               return nil
+       }
+
+       // step 2: check with the server for different leaf nodes
+       syncShard, err := r.scheduler.db.loadShard(ctx, 
common.ShardID(request.ShardId))
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, err)
+       }
+       leafNodeCache := make(map[int32]map[string]*repairTreeNode)
+       differTreeSummary := treeSummaryResp.DifferTreeSummary
+keepLeafCompare:
+       r.handleDifferSummaryFromServer(ctx, stream, differTreeSummary, reader, 
syncShard, clientSlotNodes, leafNodeCache)
+keepReceiveServerMsg:
+       // step 3: keep receiving messages from the server
+       // if the server still sending different nodes, we should keep reading 
them
+       // if the server sends a PropertySync, we should repair the property 
and send the newer property back to the server if needed
+       serverTreeSummaryResp, err = stream.Recv()
+       if err != nil {
+               // if the server side has no more different nodes, we can stop 
sync
+               if errors.Is(err, io.EOF) {
+                       return nil
+               }
+               return fmt.Errorf("failed to keep receive tree summary from 
server: %w", err)
+       }
+       switch respData := serverTreeSummaryResp.Data.(type) {
+       case *propertyv1.RepairResponse_DifferTreeSummary:
+               // keep reading the tree summary until there are no more 
different nodes
+               differTreeSummary = respData.DifferTreeSummary
+               goto keepLeafCompare
+       case *propertyv1.RepairResponse_PropertySync:
+               sync := respData.PropertySync
+               updated, newer, err := syncShard.repair(ctx, sync.Id, 
sync.Property, sync.DeleteTime)
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to repair 
property %s", sync.Id)
+                       r.scheduler.metrics.totalRepairFailedCount.Inc(1, 
request.Group, fmt.Sprintf("%d", request.ShardId))
+                       goto keepReceiveServerMsg
+               }
+               if updated {
+                       r.scheduler.metrics.totalRepairSuccessCount.Inc(1, 
request.Group, fmt.Sprintf("%d", request.ShardId))
+               }
+               // if the property hasn't been updated, and the newer property 
is not nil,
+               // which means the property is newer than the server side,
+               if !updated && newer != nil {
+                       var p propertyv1.Property
+                       err = protojson.Unmarshal(newer.source, &p)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
unmarshal property from db by entity %s", newer.id)
+                               goto keepReceiveServerMsg
+                       }
+                       // send the newer property to the server
+                       err = stream.Send(&propertyv1.RepairRequest{
+                               Data: &propertyv1.RepairRequest_PropertySync{
+                                       PropertySync: &propertyv1.PropertySync{
+                                               Id:         newer.id,
+                                               Property:   &p,
+                                               DeleteTime: newer.deleteTime,
+                                       },
+                               },
+                       })
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
send newer property sync response to server, entity: %s", newer.id)
+                       }
+               }
+               goto keepReceiveServerMsg
+       default:
+               // if the response is not a DifferTreeSummary or PropertySync, 
then we should ignore it
+               r.scheduler.l.Warn().Msgf("unexpected response type: %T, 
expected DifferTreeSummary or PropertySync", respData)
+               goto keepReceiveServerMsg
+       }
+}
+
+func (r *repairGossipClient) sendPropertyMissing(stream 
grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse], entity string) {
+       err := stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_PropertyMissing{
+                       PropertyMissing: &propertyv1.PropertyMissing{
+                               Entity: entity,
+                       },
+               },
+       })
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send property 
missing response to client, entity: %s", entity)
+       }
+}
+
+func (r *repairGossipClient) handleDifferSummaryFromServer(
+       ctx context.Context,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       differTreeSummary *propertyv1.DifferTreeSummary,
+       reader repairTreeReader,
+       syncShard *shard,
+       clientSlotNodes map[int32]*repairTreeNode,
+       leafNodeCache map[int32]map[string]*repairTreeNode,
+) {
+       // if their no more different nodes, means the client side could be 
send the no more property sync request to notify the server
+       if len(differTreeSummary.Nodes) == 0 {
+               err := stream.Send(&propertyv1.RepairRequest{
+                       Data: &propertyv1.RepairRequest_NoMorePropertySync{
+                               NoMorePropertySync: 
&propertyv1.NoMorePropertySync{},
+                       },
+               })
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to send no 
more property sync request to server")
+                       return
+               }
+       }
+       // keep reading the tree summary until there are no more different nodes
+       for _, node := range differTreeSummary.Nodes {
+               // if the repair node doesn't exist in the server side, then 
should send all the real property data to server

Review Comment:
   Add context checking
   
   ```
   select {
     case <-ctx.Done():
         return ctx.Err()
     default:
         // continue processing
     }
    ``` 



##########
banyand/property/repair_gossip.go:
##########
@@ -0,0 +1,696 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group 
string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, 
error) {
+       root, err := reader.read(nil, 1)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read tree root: %w", err)
+       }
+       if len(root) == 0 {
+               return nil, nil, fmt.Errorf("tree root is empty for group %s", 
group)
+       }
+       slots, err := reader.read(root[0], 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read slots for group %s: 
%w", group, err)
+       }
+       result := &propertyv1.TreeSummary{
+               Group:     group,
+               ShardId:   shardID,
+               TreeFound: true,
+               RootSha:   root[0].shaValue,
+               SlotSha:   make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+       }
+       slotsNodes := make(map[int32]*repairTreeNode, len(slots))
+       for _, s := range slots {
+               result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{
+                       Slot:  s.slotInx,
+                       Value: s.shaValue,
+               })
+               slotsNodes[s.slotInx] = s
+       }
+
+       return result, slotsNodes, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool

Review Comment:
   It's always false.



##########
api/proto/banyandb/property/v1/repair.proto:
##########
@@ -0,0 +1,97 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.property.v1;
+
+import "banyandb/property/v1/property.proto";
+
+option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1";
+option java_package = "org.apache.skywalking.banyandb.property.v1";
+
+message TreeSlotSHA {
+  int32 slot = 1;
+  string value = 2;
+}
+
+message TreeLeafNode {
+  // slot_index is the index of the slot in the tree.
+  int32 slot_index = 1;
+  // if the slot is empty, means the server side don't have the slot.
+  bool exists = 2;
+  // if the slot and entity exists, the SHA value of the entity.
+  string entity = 3;
+  string sha = 4;
+}
+
+message TreeSummary {
+  string group = 1;
+  uint32 shard_id = 2;
+  bool tree_found = 3;
+  string root_sha = 4;
+  repeated TreeSlotSHA slot_sha = 5;
+}
+
+message PropertyMissing {
+  string entity = 1;
+}
+
+message DifferTreeSummary {
+  bool tree_found = 1;
+  // if the nodes is empty, mean the server side don't have more tree leaf 
nodes to send.
+  repeated TreeLeafNode nodes = 2;
+}
+
+message PropertySync {
+  bytes id = 1;
+  banyandb.property.v1.Property property = 2;
+  int64 delete_time = 3;
+}
+
+message NoMorePropertySync {}
+
+message RepairRequest {
+  oneof data {
+    // compare stage
+    TreeSummary tree_summary = 11;
+    // repair stage
+    // case 1: client missing but server existing
+    PropertyMissing property_missing = 12;
+    // case 2: client existing but server missing
+    // case 3: SHA value mismatches
+    PropertySync property_sync = 13;
+    // if client side is already send all the properties(missing or property 
sync)
+    // which means the client side will not sending more properties to sync, 
server side should close the stream.
+    NoMorePropertySync no_more_property_sync = 14;

Review Comment:
   It seems not needed. The client can close the stream, then the server will 
get a "io.EOF" to close the stream gracefully. 



##########
banyand/property/repair_gossip.go:
##########
@@ -0,0 +1,696 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group 
string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, 
error) {
+       root, err := reader.read(nil, 1)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read tree root: %w", err)
+       }
+       if len(root) == 0 {
+               return nil, nil, fmt.Errorf("tree root is empty for group %s", 
group)
+       }
+       slots, err := reader.read(root[0], 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read slots for group %s: 
%w", group, err)
+       }
+       result := &propertyv1.TreeSummary{
+               Group:     group,
+               ShardId:   shardID,
+               TreeFound: true,
+               RootSha:   root[0].shaValue,
+               SlotSha:   make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+       }
+       slotsNodes := make(map[int32]*repairTreeNode, len(slots))
+       for _, s := range slots {
+               result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{
+                       Slot:  s.slotInx,
+                       Value: s.shaValue,
+               })
+               slotsNodes[s.slotInx] = s
+       }
+
+       return result, slotsNodes, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err := 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, 
request.Group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
rebuild tree for group %s, shard %d", request.Group, request.ShardId)
+                       }
+               }
+       }()
+       reader, found, err := r.getTreeReader(ctx, request.Group, 
request.ShardId)
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get 
tree reader on client side: %v", err)
+       }
+       if !found {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s, shard %d not found on client side", request.Group, request.ShardId)
+       }
+       defer reader.close()
+       summary, clientSlotNodes, err := r.buildTreeSummary(reader, 
request.Group, request.ShardId)
+       if err != nil {
+               // if the tree summary cannot be built, we should abort the 
propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to 
query tree summary on client side: %v", err)
+       }
+
+       stream, err := client.Repair(ctx)
+       if err != nil {
+               return fmt.Errorf("failed to create repair stream: %w", err)
+       }
+
+       // step 1: send merkle tree summary
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_TreeSummary{
+                       TreeSummary: summary,
+               },
+       })
+       if err != nil {
+               return fmt.Errorf("failed to send tree summary: %w", err)
+       }
+
+       serverTreeSummaryResp, err := stream.Recv()
+       if err != nil {
+               return fmt.Errorf("failed to receive tree summary from server: 
%w", err)
+       }
+       treeSummaryResp, ok := 
serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary)
+       if !ok {
+               return fmt.Errorf("unexpected response type: %T, expected 
DifferTreeSummary", serverTreeSummaryResp.Data)
+       }
+       if !treeSummaryResp.DifferTreeSummary.TreeFound {
+               // if the tree is not found, we should abort the propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s not found on server side", request.Group)
+       }
+       if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 {
+               // there no different nodes, we can skip repair
+               return nil
+       }
+
+       // step 2: check with the server for different leaf nodes
+       syncShard, err := r.scheduler.db.loadShard(ctx, 
common.ShardID(request.ShardId))
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, err)
+       }
+       leafNodeCache := make(map[int32]map[string]*repairTreeNode)

Review Comment:
   Consider using a stream pattern to compare the two nodes since the tree's 
leaves are sorted. 



##########
banyand/property/repair_gossip.go:
##########
@@ -0,0 +1,696 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group 
string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, 
error) {
+       root, err := reader.read(nil, 1)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read tree root: %w", err)
+       }
+       if len(root) == 0 {
+               return nil, nil, fmt.Errorf("tree root is empty for group %s", 
group)
+       }
+       slots, err := reader.read(root[0], 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to read slots for group %s: 
%w", group, err)
+       }
+       result := &propertyv1.TreeSummary{
+               Group:     group,
+               ShardId:   shardID,
+               TreeFound: true,
+               RootSha:   root[0].shaValue,
+               SlotSha:   make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+       }
+       slotsNodes := make(map[int32]*repairTreeNode, len(slots))
+       for _, s := range slots {
+               result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{
+                       Slot:  s.slotInx,
+                       Value: s.shaValue,
+               })
+               slotsNodes[s.slotInx] = s
+       }
+
+       return result, slotsNodes, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 100)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err := 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, 
request.Group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
rebuild tree for group %s, shard %d", request.Group, request.ShardId)
+                       }
+               }
+       }()
+       reader, found, err := r.getTreeReader(ctx, request.Group, 
request.ShardId)
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get 
tree reader on client side: %v", err)
+       }
+       if !found {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s, shard %d not found on client side", request.Group, request.ShardId)
+       }
+       defer reader.close()
+       summary, clientSlotNodes, err := r.buildTreeSummary(reader, 
request.Group, request.ShardId)
+       if err != nil {
+               // if the tree summary cannot be built, we should abort the 
propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to 
query tree summary on client side: %v", err)
+       }
+
+       stream, err := client.Repair(ctx)
+       if err != nil {
+               return fmt.Errorf("failed to create repair stream: %w", err)
+       }
+
+       // step 1: send merkle tree summary
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_TreeSummary{
+                       TreeSummary: summary,
+               },
+       })
+       if err != nil {
+               return fmt.Errorf("failed to send tree summary: %w", err)
+       }
+
+       serverTreeSummaryResp, err := stream.Recv()
+       if err != nil {
+               return fmt.Errorf("failed to receive tree summary from server: 
%w", err)
+       }
+       treeSummaryResp, ok := 
serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary)
+       if !ok {
+               return fmt.Errorf("unexpected response type: %T, expected 
DifferTreeSummary", serverTreeSummaryResp.Data)
+       }
+       if !treeSummaryResp.DifferTreeSummary.TreeFound {
+               // if the tree is not found, we should abort the propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s not found on server side", request.Group)
+       }
+       if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 {
+               // there no different nodes, we can skip repair
+               return nil
+       }
+
+       // step 2: check with the server for different leaf nodes
+       syncShard, err := r.scheduler.db.loadShard(ctx, 
common.ShardID(request.ShardId))
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, err)
+       }
+       leafNodeCache := make(map[int32]map[string]*repairTreeNode)
+       differTreeSummary := treeSummaryResp.DifferTreeSummary
+keepLeafCompare:
+       r.handleDifferSummaryFromServer(ctx, stream, differTreeSummary, reader, 
syncShard, clientSlotNodes, leafNodeCache)
+keepReceiveServerMsg:
+       // step 3: keep receiving messages from the server
+       // if the server still sending different nodes, we should keep reading 
them
+       // if the server sends a PropertySync, we should repair the property 
and send the newer property back to the server if needed
+       serverTreeSummaryResp, err = stream.Recv()
+       if err != nil {
+               // if the server side has no more different nodes, we can stop 
sync
+               if errors.Is(err, io.EOF) {
+                       return nil
+               }
+               return fmt.Errorf("failed to keep receive tree summary from 
server: %w", err)
+       }
+       switch respData := serverTreeSummaryResp.Data.(type) {
+       case *propertyv1.RepairResponse_DifferTreeSummary:
+               // keep reading the tree summary until there are no more 
different nodes
+               differTreeSummary = respData.DifferTreeSummary
+               goto keepLeafCompare
+       case *propertyv1.RepairResponse_PropertySync:
+               sync := respData.PropertySync
+               updated, newer, err := syncShard.repair(ctx, sync.Id, 
sync.Property, sync.DeleteTime)
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to repair 
property %s", sync.Id)
+                       r.scheduler.metrics.totalRepairFailedCount.Inc(1, 
request.Group, fmt.Sprintf("%d", request.ShardId))
+                       goto keepReceiveServerMsg
+               }
+               if updated {
+                       r.scheduler.metrics.totalRepairSuccessCount.Inc(1, 
request.Group, fmt.Sprintf("%d", request.ShardId))
+               }
+               // if the property hasn't been updated, and the newer property 
is not nil,
+               // which means the property is newer than the server side,
+               if !updated && newer != nil {
+                       var p propertyv1.Property
+                       err = protojson.Unmarshal(newer.source, &p)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
unmarshal property from db by entity %s", newer.id)
+                               goto keepReceiveServerMsg
+                       }
+                       // send the newer property to the server
+                       err = stream.Send(&propertyv1.RepairRequest{
+                               Data: &propertyv1.RepairRequest_PropertySync{
+                                       PropertySync: &propertyv1.PropertySync{
+                                               Id:         newer.id,
+                                               Property:   &p,
+                                               DeleteTime: newer.deleteTime,
+                                       },
+                               },
+                       })
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
send newer property sync response to server, entity: %s", newer.id)
+                       }
+               }
+               goto keepReceiveServerMsg
+       default:
+               // if the response is not a DifferTreeSummary or PropertySync, 
then we should ignore it
+               r.scheduler.l.Warn().Msgf("unexpected response type: %T, 
expected DifferTreeSummary or PropertySync", respData)
+               goto keepReceiveServerMsg
+       }
+}
+
+func (r *repairGossipClient) sendPropertyMissing(stream 
grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse], entity string) {
+       err := stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_PropertyMissing{
+                       PropertyMissing: &propertyv1.PropertyMissing{
+                               Entity: entity,
+                       },
+               },
+       })
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send property 
missing response to client, entity: %s", entity)
+       }
+}
+
+func (r *repairGossipClient) handleDifferSummaryFromServer(
+       ctx context.Context,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       differTreeSummary *propertyv1.DifferTreeSummary,
+       reader repairTreeReader,
+       syncShard *shard,
+       clientSlotNodes map[int32]*repairTreeNode,
+       leafNodeCache map[int32]map[string]*repairTreeNode,
+) {
+       // if their no more different nodes, means the client side could be 
send the no more property sync request to notify the server
+       if len(differTreeSummary.Nodes) == 0 {
+               err := stream.Send(&propertyv1.RepairRequest{
+                       Data: &propertyv1.RepairRequest_NoMorePropertySync{
+                               NoMorePropertySync: 
&propertyv1.NoMorePropertySync{},
+                       },
+               })
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to send no 
more property sync request to server")
+                       return
+               }
+       }
+       // keep reading the tree summary until there are no more different nodes
+       for _, node := range differTreeSummary.Nodes {
+               // if the repair node doesn't exist in the server side, then 
should send all the real property data to server
+               if !node.Exists {
+                       clientSlotNode, exist := clientSlotNodes[node.SlotIndex]
+                       if !exist {
+                               r.scheduler.l.Warn().Msgf("client slot %d not 
exist", node.SlotIndex)
+                               continue
+                       }
+                       // read the leaf nodes from the client side
+               keepLeafNodesReading:
+                       leafNodes, err := reader.read(clientSlotNode, 10)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
read leaf nodes from client side")
+                               continue
+                       }
+                       if len(leafNodes) == 0 {
+                               continue
+                       }
+                       // reading the real property data from the leaf nodes 
and sending to the server
+                       for _, leafNode := range leafNodes {
+                               property, p, err := r.queryProperty(ctx, 
syncShard, leafNode.entity)
+                               if err != nil {
+                                       
r.scheduler.l.Warn().Err(err).Msgf("failed to query property for leaf node 
entity %s", leafNode.entity)
+                                       continue
+                               }
+                               if property != nil {
+                                       // send the property to the server
+                                       err = 
stream.Send(&propertyv1.RepairRequest{
+                                               Data: 
&propertyv1.RepairRequest_PropertySync{
+                                                       PropertySync: 
&propertyv1.PropertySync{
+                                                               Id:         
property.id,
+                                                               Property:   p,
+                                                               DeleteTime: 
property.deleteTime,
+                                                       },
+                                               },
+                                       })
+                                       if err != nil {
+                                               
r.scheduler.l.Warn().Err(err).Msgf("failed to send property sync response to 
client, entity: %s", leafNode.entity)
+                                       }
+                               }
+                       }
+                       // continue reading leaf nodes for the same slot until 
there are no more
+                       goto keepLeafNodesReading
+               }
+
+               slotNodes, slotNodesExist := clientSlotNodes[node.SlotIndex]
+               // if slot not exists in client side, then the client should 
ask the server for the property data of leaf nodes
+               if !slotNodesExist {
+                       r.sendPropertyMissing(stream, node.Entity)
+                       continue
+               }
+               // check the leaf node if exist in the client side or not
+               cache, cacheExist := leafNodeCache[node.SlotIndex]
+               if !cacheExist {
+                       cache = make(map[string]*repairTreeNode)
+                       leafNodeCache[node.SlotIndex] = cache
+               }
+               clientLeafNode, clientLeafNodeExist, err := 
r.findExistingLeafNode(cache, reader, slotNodes, node.Entity)
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to find 
existing leaf node for entity %s", node.Entity)
+                       continue
+               }
+               if !clientLeafNodeExist {
+                       r.sendPropertyMissing(stream, node.Entity)
+                       continue
+               }
+               // if the client leaf node SHA is the same as the server leaf 
node SHA, then we can skip it
+               if clientLeafNode.shaValue == node.Sha {
+                       continue
+               }
+               property, p, err := r.queryProperty(ctx, syncShard, 
clientLeafNode.entity)
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to query 
property for leaf node entity %s", clientLeafNode.entity)
+                       continue
+               }
+               if property == nil {
+                       continue
+               }
+               // send the property to the server
+               err = stream.Send(&propertyv1.RepairRequest{
+                       Data: &propertyv1.RepairRequest_PropertySync{
+                               PropertySync: &propertyv1.PropertySync{
+                                       Id:         GetPropertyID(p),
+                                       Property:   p,
+                                       DeleteTime: property.deleteTime,
+                               },
+                       },
+               })
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to send 
property sync request to server, entity: %s", clientLeafNode.entity)
+                       continue
+               }
+       }
+}
+
+func (r *repairGossipClient) findExistingLeafNode(
+       cache map[string]*repairTreeNode,
+       reader repairTreeReader,
+       parent *repairTreeNode,
+       entity string,
+) (*repairTreeNode, bool, error) {
+       // check the cache first
+       if node, exist := cache[entity]; exist {
+               return node, true, nil
+       }
+       // if not found in the cache, read from the tree
+treeReader:
+       leafNodes, err := reader.read(parent, 10)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to read tree for entity 
%s: %w", entity, err)
+       }
+       if len(leafNodes) == 0 {
+               return nil, false, nil
+       }
+       for _, leafNode := range leafNodes {
+               cache[leafNode.entity] = leafNode
+               if leafNode.entity == entity {
+                       // if the leaf node is found, cache it and return
+                       cache[entity] = leafNode
+                       return leafNode, true, nil
+               }
+       }
+       goto treeReader
+}
+
+type repairGossipServer struct {
+       propertyv1.UnimplementedRepairServiceServer
+       repairGossipBase
+}
+
+func newRepairGossipServer(s *repairScheduler) *repairGossipServer {
+       return &repairGossipServer{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,

Review Comment:
   Shared state in repairScheduler could be accessed concurrently without 
protection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to