hanahmily commented on code in PR #712: URL: https://github.com/apache/skywalking-banyandb/pull/712#discussion_r2239365417
########## api/proto/banyandb/property/v1/repair.proto: ########## @@ -0,0 +1,97 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package banyandb.property.v1; + +import "banyandb/property/v1/property.proto"; + +option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"; +option java_package = "org.apache.skywalking.banyandb.property.v1"; + +message TreeSlotSHA { + int32 slot = 1; + string value = 2; +} + +message TreeLeafNode { + // slot_index is the index of the slot in the tree. + int32 slot_index = 1; + // if the slot is empty, means the server side don't have the slot. + bool exists = 2; + // if the slot and entity exists, the SHA value of the entity. + string entity = 3; + string sha = 4; +} + +message TreeSummary { + string group = 1; + uint32 shard_id = 2; + bool tree_found = 3; + string root_sha = 4; + repeated TreeSlotSHA slot_sha = 5; +} + +message PropertyMissing { + string entity = 1; +} + +message DifferTreeSummary { + bool tree_found = 1; + // if the nodes is empty, mean the server side don't have more tree leaf nodes to send. + repeated TreeLeafNode nodes = 2; +} + +message PropertySync { + bytes id = 1; + banyandb.property.v1.Property property = 2; + int64 delete_time = 3; +} + +message NoMorePropertySync {} + +message RepairRequest { + oneof data { + // compare stage + TreeSummary tree_summary = 11; Review Comment: Why start at 11? ########## banyand/property/repair.go: ########## @@ -945,6 +1009,21 @@ func (r *repairScheduler) doRepair() (err error) { } // otherwise, we need to build the trees + return r.buildingTree(nil, "", false) +} + +// nolint: contextcheck +func (r *repairScheduler) buildingTree(shards []common.ShardID, group string, force bool) error { + if force { + r.buildTreeLocker.Lock() + } else if !r.buildTreeLocker.TryLock() { + // if not forced, we try to lock the build tree locker + return nil Review Comment: Consider adding metrics for lock contention to monitor repair conflicts. ########## banyand/property/repair_gossip.go: ########## @@ -0,0 +1,696 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, error) { + root, err := reader.read(nil, 1) + if err != nil { + return nil, nil, fmt.Errorf("failed to read tree root: %w", err) + } + if len(root) == 0 { + return nil, nil, fmt.Errorf("tree root is empty for group %s", group) + } + slots, err := reader.read(root[0], 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + result := &propertyv1.TreeSummary{ + Group: group, + ShardId: shardID, + TreeFound: true, + RootSha: root[0].shaValue, + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + slotsNodes := make(map[int32]*repairTreeNode, len(slots)) + for _, s := range slots { + result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + slotsNodes[s.slotInx] = s + } + + return result, slotsNodes, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err := r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, request.Group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to rebuild tree for group %s, shard %d", request.Group, request.ShardId) + } + } + }() + reader, found, err := r.getTreeReader(ctx, request.Group, request.ShardId) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get tree reader on client side: %v", err) + } + if !found { + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s, shard %d not found on client side", request.Group, request.ShardId) + } + defer reader.close() + summary, clientSlotNodes, err := r.buildTreeSummary(reader, request.Group, request.ShardId) + if err != nil { + // if the tree summary cannot be built, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to query tree summary on client side: %v", err) + } + + stream, err := client.Repair(ctx) Review Comment: Add stream closure: defer stream.CloseSend() ########## banyand/property/service.go: ########## @@ -87,6 +88,7 @@ func (s *service) FlagSet() *run.FlagSet { flagS.StringVar(&s.repairBuildTreeCron, "property-repair-build-tree-cron", "@every 1h", "the cron expression for repairing the build tree") flagS.DurationVar(&s.repairQuickBuildTreeTime, "property-repair-quick-build-tree-time", time.Minute*10, "the duration of the quick build tree after operate the property") + flagS.StringVar(&s.repairTriggerCron, "property-repair-trigger-cron", "* 2 * * *", "the cron expression for background repairing the property data") Review Comment: Can we disable the background repair through this flag? ########## banyand/property/repair_gossip.go: ########## @@ -0,0 +1,696 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, error) { + root, err := reader.read(nil, 1) + if err != nil { + return nil, nil, fmt.Errorf("failed to read tree root: %w", err) + } + if len(root) == 0 { + return nil, nil, fmt.Errorf("tree root is empty for group %s", group) + } + slots, err := reader.read(root[0], 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + result := &propertyv1.TreeSummary{ + Group: group, + ShardId: shardID, + TreeFound: true, + RootSha: root[0].shaValue, + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + slotsNodes := make(map[int32]*repairTreeNode, len(slots)) + for _, s := range slots { + result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + slotsNodes[s.slotInx] = s + } + + return result, slotsNodes, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err := r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, request.Group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to rebuild tree for group %s, shard %d", request.Group, request.ShardId) + } + } + }() + reader, found, err := r.getTreeReader(ctx, request.Group, request.ShardId) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get tree reader on client side: %v", err) + } + if !found { + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s, shard %d not found on client side", request.Group, request.ShardId) + } + defer reader.close() + summary, clientSlotNodes, err := r.buildTreeSummary(reader, request.Group, request.ShardId) + if err != nil { + // if the tree summary cannot be built, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to query tree summary on client side: %v", err) + } + + stream, err := client.Repair(ctx) + if err != nil { + return fmt.Errorf("failed to create repair stream: %w", err) + } + + // step 1: send merkle tree summary + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeSummary{ + TreeSummary: summary, + }, + }) + if err != nil { + return fmt.Errorf("failed to send tree summary: %w", err) + } + + serverTreeSummaryResp, err := stream.Recv() + if err != nil { + return fmt.Errorf("failed to receive tree summary from server: %w", err) + } + treeSummaryResp, ok := serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary) + if !ok { + return fmt.Errorf("unexpected response type: %T, expected DifferTreeSummary", serverTreeSummaryResp.Data) + } + if !treeSummaryResp.DifferTreeSummary.TreeFound { + // if the tree is not found, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s not found on server side", request.Group) + } + if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 { + // there no different nodes, we can skip repair + return nil + } + + // step 2: check with the server for different leaf nodes + syncShard, err := r.scheduler.db.loadShard(ctx, common.ShardID(request.ShardId)) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load failure on client side: %v", request.ShardId, err) + } + leafNodeCache := make(map[int32]map[string]*repairTreeNode) + differTreeSummary := treeSummaryResp.DifferTreeSummary +keepLeafCompare: + r.handleDifferSummaryFromServer(ctx, stream, differTreeSummary, reader, syncShard, clientSlotNodes, leafNodeCache) +keepReceiveServerMsg: Review Comment: "goto" is not recommanded. ########## banyand/property/repair_gossip.go: ########## @@ -0,0 +1,696 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, error) { + root, err := reader.read(nil, 1) + if err != nil { + return nil, nil, fmt.Errorf("failed to read tree root: %w", err) + } + if len(root) == 0 { + return nil, nil, fmt.Errorf("tree root is empty for group %s", group) + } + slots, err := reader.read(root[0], 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + result := &propertyv1.TreeSummary{ + Group: group, + ShardId: shardID, + TreeFound: true, + RootSha: root[0].shaValue, + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + slotsNodes := make(map[int32]*repairTreeNode, len(slots)) + for _, s := range slots { + result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + slotsNodes[s.slotInx] = s + } + + return result, slotsNodes, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err := r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, request.Group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to rebuild tree for group %s, shard %d", request.Group, request.ShardId) + } + } + }() + reader, found, err := r.getTreeReader(ctx, request.Group, request.ShardId) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get tree reader on client side: %v", err) + } + if !found { + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s, shard %d not found on client side", request.Group, request.ShardId) + } + defer reader.close() + summary, clientSlotNodes, err := r.buildTreeSummary(reader, request.Group, request.ShardId) + if err != nil { + // if the tree summary cannot be built, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to query tree summary on client side: %v", err) + } + + stream, err := client.Repair(ctx) + if err != nil { + return fmt.Errorf("failed to create repair stream: %w", err) + } + + // step 1: send merkle tree summary + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeSummary{ + TreeSummary: summary, + }, + }) + if err != nil { + return fmt.Errorf("failed to send tree summary: %w", err) + } + + serverTreeSummaryResp, err := stream.Recv() + if err != nil { + return fmt.Errorf("failed to receive tree summary from server: %w", err) + } + treeSummaryResp, ok := serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary) + if !ok { + return fmt.Errorf("unexpected response type: %T, expected DifferTreeSummary", serverTreeSummaryResp.Data) + } + if !treeSummaryResp.DifferTreeSummary.TreeFound { + // if the tree is not found, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s not found on server side", request.Group) + } + if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 { + // there no different nodes, we can skip repair + return nil + } + + // step 2: check with the server for different leaf nodes + syncShard, err := r.scheduler.db.loadShard(ctx, common.ShardID(request.ShardId)) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load failure on client side: %v", request.ShardId, err) + } + leafNodeCache := make(map[int32]map[string]*repairTreeNode) + differTreeSummary := treeSummaryResp.DifferTreeSummary +keepLeafCompare: + r.handleDifferSummaryFromServer(ctx, stream, differTreeSummary, reader, syncShard, clientSlotNodes, leafNodeCache) +keepReceiveServerMsg: + // step 3: keep receiving messages from the server + // if the server still sending different nodes, we should keep reading them + // if the server sends a PropertySync, we should repair the property and send the newer property back to the server if needed + serverTreeSummaryResp, err = stream.Recv() + if err != nil { + // if the server side has no more different nodes, we can stop sync + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("failed to keep receive tree summary from server: %w", err) + } + switch respData := serverTreeSummaryResp.Data.(type) { + case *propertyv1.RepairResponse_DifferTreeSummary: + // keep reading the tree summary until there are no more different nodes + differTreeSummary = respData.DifferTreeSummary + goto keepLeafCompare + case *propertyv1.RepairResponse_PropertySync: + sync := respData.PropertySync + updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, sync.DeleteTime) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s", sync.Id) + r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + goto keepReceiveServerMsg + } + if updated { + r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + } + // if the property hasn't been updated, and the newer property is not nil, + // which means the property is newer than the server side, + if !updated && newer != nil { + var p propertyv1.Property + err = protojson.Unmarshal(newer.source, &p) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to unmarshal property from db by entity %s", newer.id) + goto keepReceiveServerMsg + } + // send the newer property to the server + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: newer.id, + Property: &p, + DeleteTime: newer.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response to server, entity: %s", newer.id) + } + } + goto keepReceiveServerMsg + default: + // if the response is not a DifferTreeSummary or PropertySync, then we should ignore it + r.scheduler.l.Warn().Msgf("unexpected response type: %T, expected DifferTreeSummary or PropertySync", respData) + goto keepReceiveServerMsg + } +} + +func (r *repairGossipClient) sendPropertyMissing(stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], entity string) { + err := stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertyMissing{ + PropertyMissing: &propertyv1.PropertyMissing{ + Entity: entity, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property missing response to client, entity: %s", entity) + } +} + +func (r *repairGossipClient) handleDifferSummaryFromServer( + ctx context.Context, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], + differTreeSummary *propertyv1.DifferTreeSummary, + reader repairTreeReader, + syncShard *shard, + clientSlotNodes map[int32]*repairTreeNode, + leafNodeCache map[int32]map[string]*repairTreeNode, +) { + // if their no more different nodes, means the client side could be send the no more property sync request to notify the server + if len(differTreeSummary.Nodes) == 0 { + err := stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_NoMorePropertySync{ + NoMorePropertySync: &propertyv1.NoMorePropertySync{}, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send no more property sync request to server") + return + } + } + // keep reading the tree summary until there are no more different nodes + for _, node := range differTreeSummary.Nodes { + // if the repair node doesn't exist in the server side, then should send all the real property data to server Review Comment: Add context checking ``` select { case <-ctx.Done(): return ctx.Err() default: // continue processing } ``` ########## banyand/property/repair_gossip.go: ########## @@ -0,0 +1,696 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, error) { + root, err := reader.read(nil, 1) + if err != nil { + return nil, nil, fmt.Errorf("failed to read tree root: %w", err) + } + if len(root) == 0 { + return nil, nil, fmt.Errorf("tree root is empty for group %s", group) + } + slots, err := reader.read(root[0], 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + result := &propertyv1.TreeSummary{ + Group: group, + ShardId: shardID, + TreeFound: true, + RootSha: root[0].shaValue, + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + slotsNodes := make(map[int32]*repairTreeNode, len(slots)) + for _, s := range slots { + result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + slotsNodes[s.slotInx] = s + } + + return result, slotsNodes, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool Review Comment: It's always false. ########## api/proto/banyandb/property/v1/repair.proto: ########## @@ -0,0 +1,97 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package banyandb.property.v1; + +import "banyandb/property/v1/property.proto"; + +option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"; +option java_package = "org.apache.skywalking.banyandb.property.v1"; + +message TreeSlotSHA { + int32 slot = 1; + string value = 2; +} + +message TreeLeafNode { + // slot_index is the index of the slot in the tree. + int32 slot_index = 1; + // if the slot is empty, means the server side don't have the slot. + bool exists = 2; + // if the slot and entity exists, the SHA value of the entity. + string entity = 3; + string sha = 4; +} + +message TreeSummary { + string group = 1; + uint32 shard_id = 2; + bool tree_found = 3; + string root_sha = 4; + repeated TreeSlotSHA slot_sha = 5; +} + +message PropertyMissing { + string entity = 1; +} + +message DifferTreeSummary { + bool tree_found = 1; + // if the nodes is empty, mean the server side don't have more tree leaf nodes to send. + repeated TreeLeafNode nodes = 2; +} + +message PropertySync { + bytes id = 1; + banyandb.property.v1.Property property = 2; + int64 delete_time = 3; +} + +message NoMorePropertySync {} + +message RepairRequest { + oneof data { + // compare stage + TreeSummary tree_summary = 11; + // repair stage + // case 1: client missing but server existing + PropertyMissing property_missing = 12; + // case 2: client existing but server missing + // case 3: SHA value mismatches + PropertySync property_sync = 13; + // if client side is already send all the properties(missing or property sync) + // which means the client side will not sending more properties to sync, server side should close the stream. + NoMorePropertySync no_more_property_sync = 14; Review Comment: It seems not needed. The client can close the stream, then the server will get a "io.EOF" to close the stream gracefully. ########## banyand/property/repair_gossip.go: ########## @@ -0,0 +1,696 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, error) { + root, err := reader.read(nil, 1) + if err != nil { + return nil, nil, fmt.Errorf("failed to read tree root: %w", err) + } + if len(root) == 0 { + return nil, nil, fmt.Errorf("tree root is empty for group %s", group) + } + slots, err := reader.read(root[0], 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + result := &propertyv1.TreeSummary{ + Group: group, + ShardId: shardID, + TreeFound: true, + RootSha: root[0].shaValue, + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + slotsNodes := make(map[int32]*repairTreeNode, len(slots)) + for _, s := range slots { + result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + slotsNodes[s.slotInx] = s + } + + return result, slotsNodes, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err := r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, request.Group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to rebuild tree for group %s, shard %d", request.Group, request.ShardId) + } + } + }() + reader, found, err := r.getTreeReader(ctx, request.Group, request.ShardId) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get tree reader on client side: %v", err) + } + if !found { + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s, shard %d not found on client side", request.Group, request.ShardId) + } + defer reader.close() + summary, clientSlotNodes, err := r.buildTreeSummary(reader, request.Group, request.ShardId) + if err != nil { + // if the tree summary cannot be built, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to query tree summary on client side: %v", err) + } + + stream, err := client.Repair(ctx) + if err != nil { + return fmt.Errorf("failed to create repair stream: %w", err) + } + + // step 1: send merkle tree summary + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeSummary{ + TreeSummary: summary, + }, + }) + if err != nil { + return fmt.Errorf("failed to send tree summary: %w", err) + } + + serverTreeSummaryResp, err := stream.Recv() + if err != nil { + return fmt.Errorf("failed to receive tree summary from server: %w", err) + } + treeSummaryResp, ok := serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary) + if !ok { + return fmt.Errorf("unexpected response type: %T, expected DifferTreeSummary", serverTreeSummaryResp.Data) + } + if !treeSummaryResp.DifferTreeSummary.TreeFound { + // if the tree is not found, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s not found on server side", request.Group) + } + if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 { + // there no different nodes, we can skip repair + return nil + } + + // step 2: check with the server for different leaf nodes + syncShard, err := r.scheduler.db.loadShard(ctx, common.ShardID(request.ShardId)) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load failure on client side: %v", request.ShardId, err) + } + leafNodeCache := make(map[int32]map[string]*repairTreeNode) Review Comment: Consider using a stream pattern to compare the two nodes since the tree's leaves are sorted. ########## banyand/property/repair_gossip.go: ########## @@ -0,0 +1,696 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) buildTreeSummary(reader repairTreeReader, group string, shardID uint32) (*propertyv1.TreeSummary, map[int32]*repairTreeNode, error) { + root, err := reader.read(nil, 1) + if err != nil { + return nil, nil, fmt.Errorf("failed to read tree root: %w", err) + } + if len(root) == 0 { + return nil, nil, fmt.Errorf("tree root is empty for group %s", group) + } + slots, err := reader.read(root[0], 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + result := &propertyv1.TreeSummary{ + Group: group, + ShardId: shardID, + TreeFound: true, + RootSha: root[0].shaValue, + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + slotsNodes := make(map[int32]*repairTreeNode, len(slots)) + for _, s := range slots { + result.SlotSha = append(result.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + slotsNodes[s.slotInx] = s + } + + return result, slotsNodes, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, 100) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err := r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, request.Group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to rebuild tree for group %s, shard %d", request.Group, request.ShardId) + } + } + }() + reader, found, err := r.getTreeReader(ctx, request.Group, request.ShardId) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get tree reader on client side: %v", err) + } + if !found { + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s, shard %d not found on client side", request.Group, request.ShardId) + } + defer reader.close() + summary, clientSlotNodes, err := r.buildTreeSummary(reader, request.Group, request.ShardId) + if err != nil { + // if the tree summary cannot be built, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to query tree summary on client side: %v", err) + } + + stream, err := client.Repair(ctx) + if err != nil { + return fmt.Errorf("failed to create repair stream: %w", err) + } + + // step 1: send merkle tree summary + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeSummary{ + TreeSummary: summary, + }, + }) + if err != nil { + return fmt.Errorf("failed to send tree summary: %w", err) + } + + serverTreeSummaryResp, err := stream.Recv() + if err != nil { + return fmt.Errorf("failed to receive tree summary from server: %w", err) + } + treeSummaryResp, ok := serverTreeSummaryResp.Data.(*propertyv1.RepairResponse_DifferTreeSummary) + if !ok { + return fmt.Errorf("unexpected response type: %T, expected DifferTreeSummary", serverTreeSummaryResp.Data) + } + if !treeSummaryResp.DifferTreeSummary.TreeFound { + // if the tree is not found, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s not found on server side", request.Group) + } + if len(treeSummaryResp.DifferTreeSummary.Nodes) == 0 { + // there no different nodes, we can skip repair + return nil + } + + // step 2: check with the server for different leaf nodes + syncShard, err := r.scheduler.db.loadShard(ctx, common.ShardID(request.ShardId)) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load failure on client side: %v", request.ShardId, err) + } + leafNodeCache := make(map[int32]map[string]*repairTreeNode) + differTreeSummary := treeSummaryResp.DifferTreeSummary +keepLeafCompare: + r.handleDifferSummaryFromServer(ctx, stream, differTreeSummary, reader, syncShard, clientSlotNodes, leafNodeCache) +keepReceiveServerMsg: + // step 3: keep receiving messages from the server + // if the server still sending different nodes, we should keep reading them + // if the server sends a PropertySync, we should repair the property and send the newer property back to the server if needed + serverTreeSummaryResp, err = stream.Recv() + if err != nil { + // if the server side has no more different nodes, we can stop sync + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("failed to keep receive tree summary from server: %w", err) + } + switch respData := serverTreeSummaryResp.Data.(type) { + case *propertyv1.RepairResponse_DifferTreeSummary: + // keep reading the tree summary until there are no more different nodes + differTreeSummary = respData.DifferTreeSummary + goto keepLeafCompare + case *propertyv1.RepairResponse_PropertySync: + sync := respData.PropertySync + updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, sync.DeleteTime) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s", sync.Id) + r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + goto keepReceiveServerMsg + } + if updated { + r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + } + // if the property hasn't been updated, and the newer property is not nil, + // which means the property is newer than the server side, + if !updated && newer != nil { + var p propertyv1.Property + err = protojson.Unmarshal(newer.source, &p) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to unmarshal property from db by entity %s", newer.id) + goto keepReceiveServerMsg + } + // send the newer property to the server + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: newer.id, + Property: &p, + DeleteTime: newer.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response to server, entity: %s", newer.id) + } + } + goto keepReceiveServerMsg + default: + // if the response is not a DifferTreeSummary or PropertySync, then we should ignore it + r.scheduler.l.Warn().Msgf("unexpected response type: %T, expected DifferTreeSummary or PropertySync", respData) + goto keepReceiveServerMsg + } +} + +func (r *repairGossipClient) sendPropertyMissing(stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], entity string) { + err := stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertyMissing{ + PropertyMissing: &propertyv1.PropertyMissing{ + Entity: entity, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property missing response to client, entity: %s", entity) + } +} + +func (r *repairGossipClient) handleDifferSummaryFromServer( + ctx context.Context, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], + differTreeSummary *propertyv1.DifferTreeSummary, + reader repairTreeReader, + syncShard *shard, + clientSlotNodes map[int32]*repairTreeNode, + leafNodeCache map[int32]map[string]*repairTreeNode, +) { + // if their no more different nodes, means the client side could be send the no more property sync request to notify the server + if len(differTreeSummary.Nodes) == 0 { + err := stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_NoMorePropertySync{ + NoMorePropertySync: &propertyv1.NoMorePropertySync{}, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send no more property sync request to server") + return + } + } + // keep reading the tree summary until there are no more different nodes + for _, node := range differTreeSummary.Nodes { + // if the repair node doesn't exist in the server side, then should send all the real property data to server + if !node.Exists { + clientSlotNode, exist := clientSlotNodes[node.SlotIndex] + if !exist { + r.scheduler.l.Warn().Msgf("client slot %d not exist", node.SlotIndex) + continue + } + // read the leaf nodes from the client side + keepLeafNodesReading: + leafNodes, err := reader.read(clientSlotNode, 10) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to read leaf nodes from client side") + continue + } + if len(leafNodes) == 0 { + continue + } + // reading the real property data from the leaf nodes and sending to the server + for _, leafNode := range leafNodes { + property, p, err := r.queryProperty(ctx, syncShard, leafNode.entity) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to query property for leaf node entity %s", leafNode.entity) + continue + } + if property != nil { + // send the property to the server + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: property.id, + Property: p, + DeleteTime: property.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property sync response to client, entity: %s", leafNode.entity) + } + } + } + // continue reading leaf nodes for the same slot until there are no more + goto keepLeafNodesReading + } + + slotNodes, slotNodesExist := clientSlotNodes[node.SlotIndex] + // if slot not exists in client side, then the client should ask the server for the property data of leaf nodes + if !slotNodesExist { + r.sendPropertyMissing(stream, node.Entity) + continue + } + // check the leaf node if exist in the client side or not + cache, cacheExist := leafNodeCache[node.SlotIndex] + if !cacheExist { + cache = make(map[string]*repairTreeNode) + leafNodeCache[node.SlotIndex] = cache + } + clientLeafNode, clientLeafNodeExist, err := r.findExistingLeafNode(cache, reader, slotNodes, node.Entity) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to find existing leaf node for entity %s", node.Entity) + continue + } + if !clientLeafNodeExist { + r.sendPropertyMissing(stream, node.Entity) + continue + } + // if the client leaf node SHA is the same as the server leaf node SHA, then we can skip it + if clientLeafNode.shaValue == node.Sha { + continue + } + property, p, err := r.queryProperty(ctx, syncShard, clientLeafNode.entity) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to query property for leaf node entity %s", clientLeafNode.entity) + continue + } + if property == nil { + continue + } + // send the property to the server + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: GetPropertyID(p), + Property: p, + DeleteTime: property.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property sync request to server, entity: %s", clientLeafNode.entity) + continue + } + } +} + +func (r *repairGossipClient) findExistingLeafNode( + cache map[string]*repairTreeNode, + reader repairTreeReader, + parent *repairTreeNode, + entity string, +) (*repairTreeNode, bool, error) { + // check the cache first + if node, exist := cache[entity]; exist { + return node, true, nil + } + // if not found in the cache, read from the tree +treeReader: + leafNodes, err := reader.read(parent, 10) + if err != nil { + return nil, false, fmt.Errorf("failed to read tree for entity %s: %w", entity, err) + } + if len(leafNodes) == 0 { + return nil, false, nil + } + for _, leafNode := range leafNodes { + cache[leafNode.entity] = leafNode + if leafNode.entity == entity { + // if the leaf node is found, cache it and return + cache[entity] = leafNode + return leafNode, true, nil + } + } + goto treeReader +} + +type repairGossipServer struct { + propertyv1.UnimplementedRepairServiceServer + repairGossipBase +} + +func newRepairGossipServer(s *repairScheduler) *repairGossipServer { + return &repairGossipServer{ + repairGossipBase: repairGossipBase{ + scheduler: s, Review Comment: Shared state in repairScheduler could be accessed concurrently without protection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org