This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ff1c63e33 Support gossip repair protocol in schema property server 
(#982)
ff1c63e33 is described below

commit ff1c63e3311f90a8c67c9421362a1fed8b464d02
Author: mrproliu <[email protected]>
AuthorDate: Mon Mar 2 15:17:42 2026 +0800

    Support gossip repair protocol in schema property server (#982)
    
    * Support gossip repair protocol in schema property server
---
 api/common/id.go                                   |  10 +-
 api/proto/banyandb/database/v1/database.proto      |   1 +
 banyand/metadata/client.go                         |   1 +
 .../metadata/schema/schemaserver/repair_service.go | 196 +++++++++
 .../schema/schemaserver/repair_service_test.go     | 452 +++++++++++++++++++++
 banyand/metadata/schema/schemaserver/service.go    |   7 +
 banyand/metadata/service/server.go                 |  44 +-
 banyand/property/db/repair.go                      |   5 +
 banyand/property/db/repair_gossip_test.go          |   4 +-
 banyand/property/gossip/client.go                  |   2 +-
 banyand/property/gossip/server.go                  |  17 +-
 banyand/property/gossip/service.go                 |  56 +--
 banyand/property/gossip/service_test.go            |   4 +-
 banyand/property/gossip/trace.go                   |   6 +-
 banyand/property/service.go                        |   4 +-
 banyand/queue/sub/server.go                        |   1 +
 bydbctl/internal/cmd/property_test.go              |  12 +-
 docs/api-reference.md                              |   1 +
 pkg/cmdsetup/data.go                               |   3 +-
 pkg/cmdsetup/liaison.go                            |   2 +-
 pkg/cmdsetup/standalone.go                         |   2 +-
 21 files changed, 778 insertions(+), 52 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index b5437f758..0df3ef2b1 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -154,8 +154,9 @@ type Node struct {
        GrpcAddress string
        HTTPAddress string
 
-       PropertyGossipGrpcAddress string
-       PropertySchemaGrpcAddress string
+       PropertyGossipGrpcAddress       string
+       PropertySchemaGrpcAddress       string
+       PropertySchemaGossipGrpcAddress string
 }
 
 var (
@@ -197,7 +198,7 @@ func ParseNodeHostProvider(s string) (NodeHostProvider, 
error) {
 }
 
 // GenerateNode generates a node id.
-func GenerateNode(grpcPort, httpPort, propertyGrpcPort, propertySchemaGrpcPort 
*uint32) (node Node, err error) {
+func GenerateNode(grpcPort, httpPort, propertyGrpcPort, 
propertySchemaGrpcPort, propertySchemaGossipPort *uint32) (node Node, err 
error) {
        port := grpcPort
        if port == nil {
                port = httpPort
@@ -237,6 +238,9 @@ func GenerateNode(grpcPort, httpPort, propertyGrpcPort, 
propertySchemaGrpcPort *
        if propertySchemaGrpcPort != nil {
                node.PropertySchemaGrpcAddress = net.JoinHostPort(nodeHost, 
strconv.FormatUint(uint64(*propertySchemaGrpcPort), 10))
        }
+       if propertySchemaGossipPort != nil {
+               node.PropertySchemaGossipGrpcAddress = 
net.JoinHostPort(nodeHost, 
strconv.FormatUint(uint64(*propertySchemaGossipPort), 10))
+       }
        node.Labels = ParseNodeFlags()
        return node, nil
 }
diff --git a/api/proto/banyandb/database/v1/database.proto 
b/api/proto/banyandb/database/v1/database.proto
index 7db7783c0..a564eb157 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -42,6 +42,7 @@ message Node {
   map<string, string> labels = 6;
   string property_repair_gossip_grpc_address = 7;
   string property_schema_grpc_address = 8;
+  string property_schema_gossip_grpc_address = 9;
 }
 
 message Shard {
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 096c9e4bf..ba1c5ea8f 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -436,6 +436,7 @@ func (s *clientService) registerNodeIfNeeded(ctx 
context.Context, l *logger.Logg
 
                PropertyRepairGossipGrpcAddress: node.PropertyGossipGrpcAddress,
                PropertySchemaGrpcAddress:       node.PropertySchemaGrpcAddress,
+               PropertySchemaGossipGrpcAddress: 
node.PropertySchemaGossipGrpcAddress,
        }
        for {
                ctxCancelable, cancel := context.WithTimeout(ctx, 
time.Second*10)
diff --git a/banyand/metadata/schema/schemaserver/repair_service.go 
b/banyand/metadata/schema/schemaserver/repair_service.go
new file mode 100644
index 000000000..56532fffe
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/repair_service.go
@@ -0,0 +1,196 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/robfig/cron/v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// RepairService is the independent schema repair service.
+type RepairService interface {
+       run.PreRunner
+       run.Config
+       run.Service
+       GetGossipPort() *uint32
+}
+
+type repairService struct {
+       schema.UnimplementedOnInitHandler
+       registerGossip func(gossip.Messenger)
+       metadata       metadata.Repo
+       messenger      gossip.Messenger
+       scheduler      *timestamp.Scheduler
+       closer         *run.Closer
+       l              *logger.Logger
+       metaNodes      map[string]struct{}
+       repairCron     string
+       mu             sync.RWMutex
+}
+
+// NewGossipService creates a new schema repair service.
+func NewGossipService(registerGossip func(gossip.Messenger), metadata 
metadata.Repo,
+       pipelineClient queue.Client, omr observability.MetricsRegistry,
+) RepairService {
+       return &repairService{
+               registerGossip: registerGossip,
+               metadata:       metadata,
+               closer:         run.NewCloser(1),
+               metaNodes:      make(map[string]struct{}),
+               messenger: gossip.NewMessenger(
+                       "schema-property",
+                       func(n *databasev1.Node) string { return 
n.PropertySchemaGossipGrpcAddress },
+                       omr, metadata, pipelineClient, 17933,
+               ),
+       }
+}
+
+func (r *repairService) Name() string {
+       return "schema-repair"
+}
+
+func (r *repairService) Role() databasev1.Role {
+       return databasev1.Role_ROLE_META
+}
+
+func (r *repairService) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("schema-property-repair")
+       flagS.StringVar(&r.repairCron, "schema-property-repair-trigger-cron", 
"@every 10m", "the cron expression for schema repair gossip")
+       flagS.AddFlagSet(r.messenger.FlagSet().FlagSet)
+       return flagS
+}
+
+func (r *repairService) Validate() error {
+       _, cronErr := cron.ParseStandard(r.repairCron)
+       if cronErr != nil {
+               return fmt.Errorf("schema-property-repair-trigger-cron is not a 
valid cron expression: %w", cronErr)
+       }
+       return r.messenger.Validate()
+}
+
+func (r *repairService) PreRun(ctx context.Context) error {
+       r.l = logger.GetLogger("schema-repair")
+       if preRunErr := r.messenger.PreRun(ctx); preRunErr != nil {
+               return preRunErr
+       }
+       r.registerGossip(r.messenger)
+       r.metadata.RegisterHandler("schema-repair-nodes", schema.KindNode, r)
+       r.scheduler = timestamp.NewScheduler(r.l, timestamp.NewClock())
+       registerErr := r.scheduler.Register("schema-property-repair-trigger",
+               
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
+               r.repairCron, func(_ time.Time, l *logger.Logger) bool {
+                       l.Debug().Msg("starting schema repair gossip")
+                       if repairErr := r.doRepairGossip(); repairErr != nil {
+                               l.Err(repairErr).Msg("schema repair gossip 
failed")
+                       }
+                       return true
+               })
+       if registerErr != nil {
+               return fmt.Errorf("failed to register schema repair cron task: 
%w", registerErr)
+       }
+       return nil
+}
+
+func (r *repairService) Serve() run.StopNotify {
+       r.messenger.Serve(r.closer)
+       return r.closer.CloseNotify()
+}
+
+func (r *repairService) GracefulStop() {
+       if r.scheduler != nil {
+               r.scheduler.Close()
+       }
+       r.messenger.GracefulStop()
+       r.closer.CloseThenWait()
+}
+
+// GetGossipPort returns the gossip gRPC port.
+func (r *repairService) GetGossipPort() *uint32 {
+       return r.messenger.GetServerPort()
+}
+
+// OnAddOrUpdate tracks META nodes that have a schema repair gossip address.
+func (r *repairService) OnAddOrUpdate(md schema.Metadata) {
+       if md.Kind != schema.KindNode {
+               return
+       }
+       node, ok := md.Spec.(*databasev1.Node)
+       if !ok {
+               return
+       }
+       containsMetaRole := false
+       for _, role := range node.Roles {
+               if role == databasev1.Role_ROLE_META {
+                       containsMetaRole = true
+                       break
+               }
+       }
+       if !containsMetaRole {
+               return
+       }
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       if node.PropertySchemaGossipGrpcAddress == "" {
+               delete(r.metaNodes, node.Metadata.GetName())
+               return
+       }
+       r.metaNodes[node.Metadata.GetName()] = struct{}{}
+}
+
+// OnDelete removes nodes from tracking.
+func (r *repairService) OnDelete(md schema.Metadata) {
+       if md.Kind != schema.KindNode {
+               return
+       }
+       node, ok := md.Spec.(*databasev1.Node)
+       if !ok {
+               return
+       }
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       delete(r.metaNodes, node.Metadata.GetName())
+}
+
+func (r *repairService) doRepairGossip() error {
+       r.mu.RLock()
+       nodes := make([]string, 0, len(r.metaNodes))
+       for name := range r.metaNodes {
+               nodes = append(nodes, name)
+       }
+       r.mu.RUnlock()
+       if len(nodes) < 2 {
+               r.l.Debug().Msg("schema repair gossip is skipped because there 
are " +
+                       "less than 2 meta nodes with schema repair gossip 
address")
+               return nil
+       }
+       return r.messenger.Propagation(nodes, schema.SchemaGroup, 0)
+}
diff --git a/banyand/metadata/schema/schemaserver/repair_service_test.go 
b/banyand/metadata/schema/schemaserver/repair_service_test.go
new file mode 100644
index 000000000..68d671aff
--- /dev/null
+++ b/banyand/metadata/schema/schemaserver/repair_service_test.go
@@ -0,0 +1,452 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schemaserver
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// mockPropagationMessenger records Propagation calls for unit testing.
+type mockPropagationMessenger struct {
+       gossip.Messenger
+       propagationCall *propagationCall
+       mu              sync.Mutex
+}
+
+type propagationCall struct {
+       group   string
+       nodes   []string
+       shardID uint32
+}
+
+func (m *mockPropagationMessenger) Propagation(nodes []string, group string, 
shardID uint32) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.propagationCall = &propagationCall{nodes: nodes, group: group, 
shardID: shardID}
+       return nil
+}
+
+func (m *mockPropagationMessenger) getPropagationCall() *propagationCall {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       return m.propagationCall
+}
+
+func TestRepairServiceNodeTracking(t *testing.T) {
+       t.Run("add node with address", func(t *testing.T) {
+               rs := &repairService{metaNodes: make(map[string]struct{}), l: 
logger.GetLogger("test-repair")}
+               rs.OnAddOrUpdate(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{Name: "node-1", Kind: 
schema.KindNode},
+                       Spec: &databasev1.Node{
+                               Metadata:                        
&commonv1.Metadata{Name: "node-1"},
+                               Roles:                           
[]databasev1.Role{databasev1.Role_ROLE_META},
+                               PropertySchemaGossipGrpcAddress: 
"127.0.0.1:9999",
+                       },
+               })
+               rs.mu.RLock()
+               defer rs.mu.RUnlock()
+               _, exists := rs.metaNodes["node-1"]
+               assert.True(t, exists, "node should be tracked")
+       })
+
+       t.Run("add node without address", func(t *testing.T) {
+               rs := &repairService{metaNodes: make(map[string]struct{}), l: 
logger.GetLogger("test-repair")}
+               rs.OnAddOrUpdate(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{Name: "node-1", Kind: 
schema.KindNode},
+                       Spec: &databasev1.Node{
+                               Metadata:                        
&commonv1.Metadata{Name: "node-1"},
+                               PropertySchemaGossipGrpcAddress: "",
+                       },
+               })
+               rs.mu.RLock()
+               defer rs.mu.RUnlock()
+               assert.Empty(t, rs.metaNodes, "node without address should not 
be tracked")
+       })
+
+       t.Run("add non-node kind", func(t *testing.T) {
+               rs := &repairService{metaNodes: make(map[string]struct{}), l: 
logger.GetLogger("test-repair")}
+               rs.OnAddOrUpdate(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{Name: "group-1", Kind: 
schema.KindGroup},
+                       Spec:     &commonv1.Group{Metadata: 
&commonv1.Metadata{Name: "group-1"}},
+               })
+               rs.mu.RLock()
+               defer rs.mu.RUnlock()
+               assert.Empty(t, rs.metaNodes, "non-node kind should not be 
tracked")
+       })
+
+       t.Run("delete node", func(t *testing.T) {
+               rs := &repairService{metaNodes: make(map[string]struct{}), l: 
logger.GetLogger("test-repair")}
+               rs.metaNodes["node-1"] = struct{}{}
+               rs.OnDelete(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{Name: "node-1", Kind: 
schema.KindNode},
+                       Spec: &databasev1.Node{
+                               Metadata: &commonv1.Metadata{Name: "node-1"},
+                       },
+               })
+               rs.mu.RLock()
+               defer rs.mu.RUnlock()
+               _, exists := rs.metaNodes["node-1"]
+               assert.False(t, exists, "deleted node should be removed")
+       })
+
+       t.Run("delete non-existing node", func(t *testing.T) {
+               rs := &repairService{metaNodes: make(map[string]struct{}), l: 
logger.GetLogger("test-repair")}
+               rs.OnDelete(schema.Metadata{
+                       TypeMeta: schema.TypeMeta{Name: "unknown", Kind: 
schema.KindNode},
+                       Spec: &databasev1.Node{
+                               Metadata: &commonv1.Metadata{Name: "unknown"},
+                       },
+               })
+               rs.mu.RLock()
+               defer rs.mu.RUnlock()
+               assert.Empty(t, rs.metaNodes, "deleting non-existing node 
should not panic")
+       })
+}
+
+func TestRepairServiceDoRepairGossip(t *testing.T) {
+       t.Run("zero nodes", func(t *testing.T) {
+               mock := &mockPropagationMessenger{}
+               rs := &repairService{metaNodes: make(map[string]struct{}), 
messenger: mock, l: logger.GetLogger("test-repair")}
+               repairErr := rs.doRepairGossip()
+               require.NoError(t, repairErr)
+               assert.Nil(t, mock.getPropagationCall(), "should not propagate 
with zero nodes")
+       })
+
+       t.Run("one node", func(t *testing.T) {
+               mock := &mockPropagationMessenger{}
+               rs := &repairService{metaNodes: map[string]struct{}{"node-1": 
{}}, messenger: mock, l: logger.GetLogger("test-repair")}
+               repairErr := rs.doRepairGossip()
+               require.NoError(t, repairErr)
+               assert.Nil(t, mock.getPropagationCall(), "should not propagate 
with one node")
+       })
+
+       t.Run("two nodes", func(t *testing.T) {
+               mock := &mockPropagationMessenger{}
+               rs := &repairService{
+                       metaNodes: map[string]struct{}{"node-1": {}, "node-2": 
{}},
+                       messenger: mock,
+                       l:         logger.GetLogger("test-repair"),
+               }
+               repairErr := rs.doRepairGossip()
+               require.NoError(t, repairErr)
+               call := mock.getPropagationCall()
+               require.NotNil(t, call, "should propagate with two nodes")
+               assert.Len(t, call.nodes, 2)
+               assert.Equal(t, schema.SchemaGroup, call.group)
+               assert.Equal(t, uint32(0), call.shardID)
+       })
+
+       t.Run("three nodes", func(t *testing.T) {
+               mock := &mockPropagationMessenger{}
+               rs := &repairService{
+                       metaNodes: map[string]struct{}{"node-1": {}, "node-2": 
{}, "node-3": {}},
+                       messenger: mock,
+                       l:         logger.GetLogger("test-repair"),
+               }
+               repairErr := rs.doRepairGossip()
+               require.NoError(t, repairErr)
+               call := mock.getPropagationCall()
+               require.NotNil(t, call, "should propagate with three nodes")
+               assert.Len(t, call.nodes, 3)
+               assert.Equal(t, schema.SchemaGroup, call.group)
+               assert.Equal(t, uint32(0), call.shardID)
+       })
+}
+
+// testNode holds resources for one gossip-enabled node in integration tests.
+type testNode struct {
+       srv       *server
+       messenger gossip.Messenger
+       nodeID    string
+}
+
+// setupTestNode creates a schema server (which owns the DB) and a gossip 
messenger, wired together.
+func setupTestNode(t *testing.T) *testNode {
+       t.Helper()
+
+       // Use NewServer from service.go — it handles DB creation, snapshot 
config, repair config, etc.
+       srv := NewServer(observability.BypassRegistry).(*server)
+       flagSet := srv.FlagSet()
+       require.NoError(t, flagSet.Parse([]string{
+               "--schema-server-root-path", t.TempDir(),
+               "--schema-server-grpc-host", "127.0.0.1",
+               "--schema-server-grpc-port", fmt.Sprintf("%d", getFreePort(t)),
+               "--schema-server-repair-build-tree-cron", "@every 10m",
+               "--schema-server-repair-quick-build-tree-time", "6s",
+       }))
+       require.NoError(t, srv.Validate())
+       require.NoError(t, srv.PreRun(context.Background()))
+       srv.Serve()
+       t.Cleanup(func() { srv.GracefulStop() })
+
+       // Create gossip messenger for the repair protocol.
+       gossipPort := getFreePort(t)
+       addr := fmt.Sprintf("127.0.0.1:%d", gossipPort)
+       messenger := gossip.NewMessengerWithoutMetadata("schema-repair",
+               func(n *databasev1.Node) string { return 
n.PropertySchemaGossipGrpcAddress },
+               observability.NewBypassRegistry(), int(gossipPort))
+       require.NoError(t, messenger.Validate())
+       ctx := context.WithValue(context.Background(), common.ContextNodeKey, 
common.Node{
+               NodeID:                          addr,
+               PropertySchemaGossipGrpcAddress: addr,
+       })
+       require.NoError(t, messenger.PreRun(ctx))
+       // RegisterGossip must happen after PreRun because PreRun resets 
listeners.
+       srv.RegisterGossip(messenger)
+       messenger.Serve(run.NewCloser(0))
+       t.Cleanup(messenger.GracefulStop)
+
+       require.Eventually(t, func() bool {
+               conn, dialErr := net.DialTimeout("tcp", addr, time.Second)
+               if dialErr != nil {
+                       return false
+               }
+               _ = conn.Close()
+               return true
+       }, 10*time.Second, 100*time.Millisecond, "gossip server did not start")
+
+       return &testNode{srv: srv, messenger: messenger, nodeID: addr}
+}
+
+func registerNodes(nodes []*testNode) {
+       for _, m := range nodes {
+               for _, n := range nodes {
+                       
m.messenger.(schema.EventHandler).OnAddOrUpdate(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Name: n.nodeID,
+                                       Kind: schema.KindNode,
+                               },
+                               Spec: &databasev1.Node{
+                                       Metadata:                        
&commonv1.Metadata{Name: n.nodeID},
+                                       Roles:                           
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+                                       PropertySchemaGossipGrpcAddress: 
n.nodeID,
+                               },
+                       })
+               }
+       }
+}
+
+func writeProperty(t *testing.T, d db.Database, name, id string, version 
int64) {
+       t.Helper()
+       prop := &propertyv1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group:       schema.SchemaGroup,
+                       Name:        name,
+                       ModRevision: version,
+               },
+               Id: id,
+               Tags: []*modelv1.Tag{
+                       {Key: "version", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{
+                               Str: &modelv1.Str{Value: fmt.Sprintf("%d", 
version)},
+                       }}},
+               },
+       }
+       require.NoError(t, d.Update(context.Background(), common.ShardID(0), 
db.GetPropertyID(prop), prop))
+}
+
+func ensureShard(t *testing.T, nodes []*testNode) {
+       t.Helper()
+       for _, n := range nodes {
+               writeProperty(t, n.srv.db, "seed", "seed-init", 0)
+       }
+}
+
+func propagateAndVerify(t *testing.T, propagator gossip.Messenger, nodeIDs 
[]string, verifyFn func() bool) {
+       t.Helper()
+       require.Eventually(t, func() bool {
+               _ = propagator.Propagation(nodeIDs, schema.SchemaGroup, 0)
+               return verifyFn()
+       }, 30*time.Second, time.Second)
+}
+
+func queryHasAllIDs(d db.Database, expectedIDs []string) bool {
+       results, queryErr := d.Query(context.Background(), 
&propertyv1.QueryRequest{
+               Groups: []string{schema.SchemaGroup},
+               Name:   "test",
+       })
+       if queryErr != nil {
+               return false
+       }
+       foundIDs := make(map[string]bool)
+       for _, r := range results {
+               if r.DeleteTime() == 0 {
+                       foundIDs[string(r.ID())] = true
+               }
+       }
+       for _, expectedID := range expectedIDs {
+               if !foundIDs[expectedID] {
+                       return false
+               }
+       }
+       return true
+}
+
+func queryLatestVersion(d db.Database, group, name, id string) int64 {
+       results, queryErr := d.Query(context.Background(), 
&propertyv1.QueryRequest{
+               Groups: []string{group},
+               Name:   name,
+               Ids:    []string{id},
+       })
+       if queryErr != nil {
+               return -1
+       }
+       var latest int64
+       for _, r := range results {
+               if r.DeleteTime() == 0 && r.Timestamp() > latest {
+                       latest = r.Timestamp()
+               }
+       }
+       return latest
+}
+
+func verifyPropertyData(t *testing.T, d db.Database, id string, 
expectedVersion int64) {
+       t.Helper()
+       results, queryErr := d.Query(context.Background(), 
&propertyv1.QueryRequest{
+               Groups: []string{schema.SchemaGroup},
+               Name:   "test",
+               Ids:    []string{id},
+       })
+       require.NoError(t, queryErr)
+       var found bool
+       for _, r := range results {
+               if r.DeleteTime() != 0 {
+                       continue
+               }
+               var prop propertyv1.Property
+               require.NoError(t, protojson.Unmarshal(r.Source(), &prop))
+               if prop.Id != id {
+                       continue
+               }
+               found = true
+               assert.Equal(t, schema.SchemaGroup, prop.Metadata.Group, "group 
mismatch for %s", id)
+               assert.Equal(t, "test", prop.Metadata.Name, "name mismatch for 
%s", id)
+               assert.Equal(t, expectedVersion, prop.Metadata.ModRevision, 
"version mismatch for %s", id)
+               require.NotEmpty(t, prop.Tags, "tags should not be empty for 
%s", id)
+               assert.Equal(t, "version", prop.Tags[0].Key, "tag key mismatch 
for %s", id)
+               assert.Equal(t, fmt.Sprintf("%d", expectedVersion), 
prop.Tags[0].Value.GetStr().Value,
+                       "tag value mismatch for %s", id)
+       }
+       assert.True(t, found, "property %s/test/%s not found", 
schema.SchemaGroup, id)
+}
+
+func TestRepairServiceDataSyncTwoNodes(t *testing.T) {
+       node0 := setupTestNode(t)
+       node1 := setupTestNode(t)
+       nodes := []*testNode{node0, node1}
+       registerNodes(nodes)
+       ensureShard(t, nodes)
+
+       writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+
+       nodeIDs := []string{node0.nodeID, node1.nodeID}
+       expectedIDs := []string{schema.SchemaGroup + "/test/entity-0/1"}
+       propagateAndVerify(t, node0.messenger, nodeIDs, func() bool {
+               return queryHasAllIDs(node1.srv.db, expectedIDs)
+       })
+       verifyPropertyData(t, node1.srv.db, "entity-0", 1)
+}
+
+func TestRepairServiceDataSyncThreeNodes(t *testing.T) {
+       node0 := setupTestNode(t)
+       node1 := setupTestNode(t)
+       node2 := setupTestNode(t)
+       nodes := []*testNode{node0, node1, node2}
+       registerNodes(nodes)
+       ensureShard(t, nodes)
+
+       writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+       writeProperty(t, node1.srv.db, "test", "entity-1", 2)
+
+       allNodeIDs := []string{node0.nodeID, node1.nodeID, node2.nodeID}
+       expectedIDs := []string{
+               schema.SchemaGroup + "/test/entity-0/1",
+               schema.SchemaGroup + "/test/entity-1/2",
+       }
+       propagateAndVerify(t, node0.messenger, allNodeIDs, func() bool {
+               return queryHasAllIDs(node0.srv.db, expectedIDs) &&
+                       queryHasAllIDs(node1.srv.db, expectedIDs) &&
+                       queryHasAllIDs(node2.srv.db, expectedIDs)
+       })
+       for _, n := range nodes {
+               verifyPropertyData(t, n.srv.db, "entity-0", 1)
+               verifyPropertyData(t, n.srv.db, "entity-1", 2)
+       }
+}
+
+func TestRepairServiceDataSyncVersionConflict(t *testing.T) {
+       node0 := setupTestNode(t)
+       node1 := setupTestNode(t)
+       nodes := []*testNode{node0, node1}
+       registerNodes(nodes)
+       ensureShard(t, nodes)
+
+       writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+       writeProperty(t, node1.srv.db, "test", "entity-0", 2)
+
+       nodeIDs := []string{node0.nodeID, node1.nodeID}
+       propagateAndVerify(t, node0.messenger, nodeIDs, func() bool {
+               return queryLatestVersion(node0.srv.db, schema.SchemaGroup, 
"test", "entity-0") == 2
+       })
+       verifyPropertyData(t, node0.srv.db, "entity-0", 2)
+}
+
+func TestRepairServiceDataSyncMissingToFull(t *testing.T) {
+       node0 := setupTestNode(t)
+       node1 := setupTestNode(t)
+       nodes := []*testNode{node0, node1}
+       registerNodes(nodes)
+       ensureShard(t, nodes)
+
+       writeProperty(t, node0.srv.db, "test", "entity-0", 1)
+       writeProperty(t, node0.srv.db, "test", "entity-1", 2)
+       writeProperty(t, node0.srv.db, "test", "entity-2", 3)
+
+       nodeIDs := []string{node0.nodeID, node1.nodeID}
+       expectedIDs := []string{
+               schema.SchemaGroup + "/test/entity-0/1",
+               schema.SchemaGroup + "/test/entity-1/2",
+               schema.SchemaGroup + "/test/entity-2/3",
+       }
+       propagateAndVerify(t, node0.messenger, nodeIDs, func() bool {
+               return queryHasAllIDs(node1.srv.db, expectedIDs)
+       })
+       verifyPropertyData(t, node1.srv.db, "entity-0", 1)
+       verifyPropertyData(t, node1.srv.db, "entity-1", 2)
+       verifyPropertyData(t, node1.srv.db, "entity-2", 3)
+}
diff --git a/banyand/metadata/schema/schemaserver/service.go 
b/banyand/metadata/schema/schemaserver/service.go
index 32477beee..4f613faf2 100644
--- a/banyand/metadata/schema/schemaserver/service.go
+++ b/banyand/metadata/schema/schemaserver/service.go
@@ -44,6 +44,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/property/db"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -68,6 +69,7 @@ type Server interface {
        run.Config
        run.Service
        GetPort() *uint32
+       RegisterGossip(messenger gossip.Messenger)
 }
 
 type server struct {
@@ -112,6 +114,11 @@ func (s *server) GetPort() *uint32 {
        return &s.port
 }
 
+// RegisterGossip registers the DB's repair gRPC services with the gossip 
messenger.
+func (s *server) RegisterGossip(messenger gossip.Messenger) {
+       s.db.RegisterGossip(messenger)
+}
+
 func (s *server) Role() databasev1.Role {
        return databasev1.Role_ROLE_META
 }
diff --git a/banyand/metadata/service/server.go 
b/banyand/metadata/service/server.go
index cdb9d322d..cc6577643 100644
--- a/banyand/metadata/service/server.go
+++ b/banyand/metadata/service/server.go
@@ -36,6 +36,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
        
"github.com/apache/skywalking-banyandb/banyand/metadata/schema/schemaserver"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -50,12 +51,16 @@ var (
 type Service interface {
        metadata.Service
        GetSchemaServerPort() *uint32
+       GetSchemaGossipPort() *uint32
+       SetPropertyPipelineClient(queue.Client)
 }
 
 type server struct {
        metadata.Service
        etcdServer              embeddedetcd.Server
        propServer              schemaserver.Server
+       repairSvc               schemaserver.RepairService
+       pipelineClient          queue.Client
        omr                     observability.MetricsRegistry
        serviceFlags            *run.FlagSet
        scheduler               *timestamp.Scheduler
@@ -78,7 +83,8 @@ func (s *server) Name() string {
 }
 
 func (s *server) Role() databasev1.Role {
-       if s.schemaRegistryMode == schemaTypeProperty {
+       needEtcd := s.schemaRegistryMode == schemaTypeEtcd || 
s.nodeDiscoveryMode == metadata.NodeDiscoveryModeEtcd
+       if s.schemaRegistryMode == schemaTypeProperty || (s.embedded && 
needEtcd) {
                return databasev1.Role_ROLE_META
        }
        return databasev1.Role_ROLE_UNSPECIFIED
@@ -155,7 +161,12 @@ func (s *server) Validate() error {
                return validateErr
        }
        if s.schemaRegistryMode == schemaTypeProperty && s.propServer != nil {
-               return s.propServer.Validate()
+               if propValidateErr := s.propServer.Validate(); propValidateErr 
!= nil {
+                       return propValidateErr
+               }
+               if s.repairSvc != nil {
+                       return s.repairSvc.Validate()
+               }
        }
        return nil
 }
@@ -175,11 +186,17 @@ func (s *server) PreRun(ctx context.Context) error {
        switch s.schemaRegistryMode {
        case schemaTypeEtcd:
                s.propServer = nil
+               s.repairSvc = nil
        case schemaTypeProperty:
                ctx = s.enrichContextWithSchemaAddress(ctx)
                if propPreRunErr := s.propServer.PreRun(ctx); propPreRunErr != 
nil {
                        return propPreRunErr
                }
+               if s.repairSvc != nil {
+                       if repairPreRunErr := s.repairSvc.PreRun(ctx); 
repairPreRunErr != nil {
+                               return repairPreRunErr
+                       }
+               }
        default:
                return errors.New("unknown schema storage type")
        }
@@ -194,6 +211,13 @@ func (s *server) Serve() run.StopNotify {
                        <-s.propServer.Serve()
                }()
        }
+       if s.repairSvc != nil {
+               s.closer.AddRunning()
+               go func() {
+                       defer s.closer.Done()
+                       <-s.repairSvc.Serve()
+               }()
+       }
        if s.etcdServer != nil {
                s.registerDefrag()
                s.closer.AddRunning()
@@ -207,6 +231,9 @@ func (s *server) Serve() run.StopNotify {
 }
 
 func (s *server) GracefulStop() {
+       if s.repairSvc != nil {
+               s.repairSvc.GracefulStop()
+       }
        if s.propServer != nil {
                s.propServer.GracefulStop()
        }
@@ -258,6 +285,19 @@ func (s *server) GetSchemaServerPort() *uint32 {
        return nil
 }
 
+// SetPropertyPipelineClient injects the pipeline client used by the schema 
gossip repair service.
+func (s *server) SetPropertyPipelineClient(client queue.Client) {
+       s.pipelineClient = client
+}
+
+// GetSchemaGossipPort returns the schema gossip gRPC port.
+func (s *server) GetSchemaGossipPort() *uint32 {
+       if s.repairSvc != nil {
+               return s.repairSvc.GetGossipPort()
+       }
+       return nil
+}
+
 func (s *server) enrichContextWithSchemaAddress(ctx context.Context) 
context.Context {
        port := s.propServer.GetPort()
        if port == nil {
diff --git a/banyand/property/db/repair.go b/banyand/property/db/repair.go
index 9f80c5fa6..a5b967908 100644
--- a/banyand/property/db/repair.go
+++ b/banyand/property/db/repair.go
@@ -1018,6 +1018,11 @@ func (r *repairScheduler) verifyShouldExecuteBuildTree(t 
time.Time, triggerByCro
        if !triggerByCron {
                // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
                if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       r.l.Debug().Msgf("the build tree is triggered by quick 
repair, "+
+                               "but the last build tree schedule time is %s, 
which is before the current time %s minus half of the interval %s(%s), "+
+                               "skipping this quick build tree",
+                               r.latestBuildTreeSchedule.Format(time.RFC3339), 
r.buildTreeClock.Now().Format(time.RFC3339),
+                               (r.buildTreeScheduleInterval / 2).String(), 
r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval/2).String())
                        return false
                }
        } else {
diff --git a/banyand/property/db/repair_gossip_test.go 
b/banyand/property/db/repair_gossip_test.go
index e745aabc0..20f428f9f 100644
--- a/banyand/property/db/repair_gossip_test.go
+++ b/banyand/property/db/repair_gossip_test.go
@@ -383,7 +383,9 @@ func startEachNode(ctrl *gomock.Controller, node node, 
groups []group) *nodeCont
 
        ports, err := test.AllocateFreePorts(1)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       messenger := 
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0])
+       messenger := gossip.NewMessengerWithoutMetadata("property-repair",
+               func(n *databasev1.Node) string { return 
n.PropertyRepairGossipGrpcAddress },
+               observability.NewBypassRegistry(), ports[0])
        addr := fmt.Sprintf("127.0.0.1:%d", ports[0])
        result.nodeID = addr
        err = messenger.Validate()
diff --git a/banyand/property/gossip/client.go 
b/banyand/property/gossip/client.go
index 5342e304b..6858bfce0 100644
--- a/banyand/property/gossip/client.go
+++ b/banyand/property/gossip/client.go
@@ -133,7 +133,7 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
                s.log.Warn().Msg("invalid metadata type")
                return
        }
-       address := node.PropertyRepairGossipGrpcAddress
+       address := s.addressExtractor(node)
        if address == "" {
                s.log.Warn().Stringer("node", node).Msg("node does not have 
gossip address, skipping registration")
                return
diff --git a/banyand/property/gossip/server.go 
b/banyand/property/gossip/server.go
index 97b9d7f7f..cff25034f 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -156,19 +156,17 @@ func (q *protocolHandler) Propagation(_ context.Context, 
request *propertyv1.Pro
 
 func (q *protocolHandler) propagation0(_ context.Context, request 
*propertyv1.PropagationRequest, tracer Trace) (resp 
*propertyv1.PropagationResponse, err error) {
        span := tracer.CreateSpan(tracer.ActivateSpan(), "receive gossip 
message")
-       defer span.End()
        span.Tag(TraceTagGroupName, request.Group)
        span.Tag(TraceTagShardID, fmt.Sprintf("%d", request.ShardId))
        span.Tag(TraceTagOperateType, TraceTagOperateReceive)
        q.s.serverMetrics.totalReceived.Inc(1, request.Group)
        q.s.log.Debug().Stringer("request", request).Msg("received property 
repair gossip message for propagation")
+       span.End()
 
-       if q.addToProcess(request, tracer) {
-               span.Tag("added_to_process", "true")
+       if q.addToProcess(request, tracer, span) {
                q.s.serverMetrics.totalAddProcessed.Inc(1, request.Group)
                q.s.log.Debug().Msgf("add the propagation request to the 
process")
        } else {
-               span.Tag("added_to_process", "false")
                q.s.serverMetrics.totalSkipProcess.Inc(1, request.Group)
                q.s.log.Debug().Msgf("propagation request discarded")
        }
@@ -304,7 +302,7 @@ func (q *protocolHandler) contextIsDone(ctx 
context.Context) bool {
        }
 }
 
-func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest, 
tracer Trace) bool {
+func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest, 
tracer Trace, span Span) bool {
        q.mu.Lock()
        defer q.mu.Unlock()
 
@@ -313,7 +311,7 @@ func (q *protocolHandler) addToProcess(request 
*propertyv1.PropagationRequest, t
        handlingRequestData := &handlingRequest{
                PropagationRequest: request,
                tracer:             tracer,
-               parentSpan:         tracer.ActivateSpan(),
+               parentSpan:         span,
        }
        if !exist {
                groupShard = &groupWithShardPropagation{
@@ -391,10 +389,11 @@ func (s *service) newConnectionFromNode(n 
*databasev1.Node) (*grpc.ClientConn, e
        if err != nil {
                return nil, fmt.Errorf("failed to get client transport 
credentials: %w", err)
        }
-       conn, err := grpc.NewClient(n.PropertyRepairGossipGrpcAddress, 
append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...)
-       s.log.Debug().Str("address", 
n.PropertyRepairGossipGrpcAddress).Msg("starting to create gRPC client 
connection to node")
+       address := s.addressExtractor(n)
+       conn, err := grpc.NewClient(address, append(credOpts, 
grpc.WithDefaultServiceConfig(retryPolicy))...)
+       s.log.Debug().Str("address", address).Msg("starting to create gRPC 
client connection to node")
        if err != nil {
-               return nil, fmt.Errorf("failed to create gRPC client connection 
to node %s: %w", n.PropertyRepairGossipGrpcAddress, err)
+               return nil, fmt.Errorf("failed to create gRPC client connection 
to node %s: %w", address, err)
        }
        return conn, nil
 }
diff --git a/banyand/property/gossip/service.go 
b/banyand/property/gossip/service.go
index 1a11276bb..962a455e9 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -47,12 +47,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// AddressExtractor extracts the gossip gRPC address from a node.
+type AddressExtractor func(*databasev1.Node) string
+
 var (
        errServerCert = errors.New("invalid server cert file")
        errServerKey  = errors.New("invalid server key file")
        errNoAddr     = errors.New("no address")
-
-       serverScope = 
observability.RootScope.SubScope("property_repair_gossip_server")
 )
 
 const (
@@ -63,6 +64,7 @@ const (
 
 type service struct {
        schema.UnimplementedOnInitHandler
+       addressExtractor    AddressExtractor
        pipeline            queue.Client
        metadata            metadata.Repo
        creds               credentials.TransportCredentials
@@ -76,6 +78,7 @@ type service struct {
        protocolHandler     *protocolHandler
        registered          map[string]*databasev1.Node
        traceSpanNotified   *int32
+       prefix              string
        caCertPath          string
        host                string
        addr                string
@@ -99,8 +102,13 @@ type service struct {
 }
 
 // NewMessenger creates a new instance of Messenger for gossip propagation 
communication between nodes.
-func NewMessenger(omr observability.MetricsRegistry, metadata metadata.Repo, 
pipeline queue.Client) Messenger {
+func NewMessenger(prefix string, addressExtractor AddressExtractor,
+       omr observability.MetricsRegistry, metadata metadata.Repo, pipeline 
queue.Client, defaultPort uint32,
+) Messenger {
+       serverScope := observability.RootScope.SubScope(prefix + 
"_gossip_server")
        return &service{
+               prefix:           prefix,
+               addressExtractor: addressExtractor,
                metadata:         metadata,
                omr:              omr,
                closer:           run.NewCloser(1),
@@ -112,15 +120,13 @@ func NewMessenger(omr observability.MetricsRegistry, 
metadata metadata.Repo, pip
                registered:       make(map[string]*databasev1.Node),
                scheduleInterval: time.Hour * 2,
                sel:              node.NewRoundRobinSelector("", metadata),
-               port:             17932,
+               port:             defaultPort,
        }
 }
 
 // NewMessengerWithoutMetadata creates a new instance of Messenger without 
metadata.
-func NewMessengerWithoutMetadata(omr observability.MetricsRegistry, port int) 
Messenger {
-       messenger := NewMessenger(omr, nil, nil)
-       messenger.(*service).port = uint32(port)
-       return messenger
+func NewMessengerWithoutMetadata(prefix string, addressExtractor 
AddressExtractor, omr observability.MetricsRegistry, port int) Messenger {
+       return NewMessenger(prefix, addressExtractor, omr, nil, nil, 
uint32(port))
 }
 
 func (s *service) PreRun(ctx context.Context) error {
@@ -130,13 +136,14 @@ func (s *service) PreRun(ctx context.Context) error {
        }
        node := val.(common.Node)
        s.nodeID = node.NodeID
-       s.log = logger.GetLogger("gossip-messenger")
+       s.log = logger.GetLogger(s.prefix + "-gossip-messenger")
        s.listeners = make([]MessageListener, 0)
-       s.serverMetrics = newServerMetrics(s.omr.With(serverScope))
+       metricsScope := observability.RootScope.SubScope(s.prefix + 
"_gossip_server")
+       s.serverMetrics = newServerMetrics(s.omr.With(metricsScope))
        if s.metadata != nil {
                s.sel.OnInit([]schema.Kind{schema.KindGroup})
-               s.metadata.RegisterHandler("property-repair-nodes", 
schema.KindNode, s)
-               s.metadata.RegisterHandler("property-repair-groups", 
schema.KindGroup, s)
+               s.metadata.RegisterHandler(s.prefix+"-nodes", schema.KindNode, 
s)
+               s.metadata.RegisterHandler(s.prefix+"-groups", 
schema.KindGroup, s)
                if err := s.initTracing(ctx); err != nil {
                        s.log.Warn().Err(err).Msg("failed to init internal 
trace stream")
                }
@@ -151,7 +158,7 @@ func (s *service) GetServerPort() *uint32 {
 }
 
 func (s *service) Name() string {
-       return "gossip-messenger"
+       return s.prefix + "-gossip-messenger"
 }
 
 func (s *service) Role() databasev1.Role {
@@ -159,18 +166,18 @@ func (s *service) Role() databasev1.Role {
 }
 
 func (s *service) FlagSet() *run.FlagSet {
-       fs := run.NewFlagSet("gossip-messenger")
+       fs := run.NewFlagSet(s.prefix + "-gossip-messenger")
 
-       fs.VarP(&s.maxRecvMsgSize, 
"property-repair-gossip-grpc-max-recv-msg-size", "", "the size of max receiving 
message")
-       fs.StringVar(&s.host, "property-repair-gossip-grpc-host", "", "the host 
of banyand listens")
-       fs.Uint32Var(&s.port, "property-repair-gossip-grpc-port", s.port, "the 
port of banyand listens")
+       fs.VarP(&s.maxRecvMsgSize, s.prefix+"-gossip-grpc-max-recv-msg-size", 
"", "the size of max receiving message")
+       fs.StringVar(&s.host, s.prefix+"-gossip-grpc-host", "", "the host of 
banyand listens")
+       fs.Uint32Var(&s.port, s.prefix+"-gossip-grpc-port", s.port, "the port 
of banyand listens")
 
-       fs.BoolVar(&s.tls, "property-repair-gossip-grpc-tls", false, 
"connection uses TLS if true, else plain TCP")
-       fs.StringVar(&s.certFile, 
"property-repair-gossip-grpc-server-cert-file", "", "the TLS cert file")
-       fs.StringVar(&s.keyFile, "property-repair-gossip-grpc-server-key-file", 
"", "the TLS key file")
-       fs.StringVar(&s.caCertPath, "property-repair-gossip-client-ca-cert", 
"", "Path to the CA certificate for gossip client TLS communication.")
-       fs.DurationVar(&s.totalTimeout, "property-repair-gossip-total-timeout", 
defaultTotalTimeout, "the total timeout for gossip propagation")
-       fs.BoolVar(&s.traceLogEnabled, "property-repair-gossip-trace-log", 
true, "enable trace log")
+       fs.BoolVar(&s.tls, s.prefix+"-gossip-grpc-tls", false, "connection uses 
TLS if true, else plain TCP")
+       fs.StringVar(&s.certFile, s.prefix+"-gossip-grpc-server-cert-file", "", 
"the TLS cert file")
+       fs.StringVar(&s.keyFile, s.prefix+"-gossip-grpc-server-key-file", "", 
"the TLS key file")
+       fs.StringVar(&s.caCertPath, s.prefix+"-gossip-client-ca-cert", "", 
"Path to the CA certificate for gossip client TLS communication.")
+       fs.DurationVar(&s.totalTimeout, s.prefix+"-gossip-total-timeout", 
defaultTotalTimeout, "the total timeout for gossip propagation")
+       fs.BoolVar(&s.traceLogEnabled, s.prefix+"-gossip-trace-log", true, 
"enable trace log")
        return fs
 }
 
@@ -181,9 +188,6 @@ func (s *service) Validate() error {
        }
 
        // server side validation
-       if s.port == 0 {
-               s.port = 17932
-       }
        s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 
10))
        if s.addr == ":" {
                return errNoAddr
diff --git a/banyand/property/gossip/service_test.go 
b/banyand/property/gossip/service_test.go
index eebe86773..dc747e9a8 100644
--- a/banyand/property/gossip/service_test.go
+++ b/banyand/property/gossip/service_test.go
@@ -183,7 +183,9 @@ func startNodes(count int) []*nodeContext {
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
                // starting gossip messenger
-               messenger := 
NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0])
+               messenger := NewMessengerWithoutMetadata("property-repair",
+                       func(n *databasev1.Node) string { return 
n.PropertyRepairGossipGrpcAddress },
+                       observability.NewBypassRegistry(), ports[0])
                gomega.Expect(messenger).NotTo(gomega.BeNil())
                addr := fmt.Sprintf("127.0.0.1:%d", ports[0])
                
messenger.(run.PreRunner).PreRun(context.WithValue(context.Background(), 
common.ContextNodeKey, common.Node{
diff --git a/banyand/property/gossip/trace.go b/banyand/property/gossip/trace.go
index 6f5ed6da6..627eeae30 100644
--- a/banyand/property/gossip/trace.go
+++ b/banyand/property/gossip/trace.go
@@ -145,7 +145,9 @@ func (s *service) initTracing(ctx context.Context) error {
                ModRevision: dbStream.GetMetadata().GetModRevision(),
        }
        s.traceStreamSelector = 
node.NewRoundRobinSelector(data.TopicStreamWrite.String(), s.metadata)
-       s.pipeline.Register(data.TopicStreamWrite, s)
+       if s.pipeline != nil {
+               s.pipeline.Register(data.TopicStreamWrite, s)
+       }
        return nil
 }
 
@@ -174,7 +176,7 @@ func (s *service) initInternalTraceGroup(ctx 
context.Context) error {
 func (s *service) savingTracingSpans() (err error) {
        spans := s.readAllReadySendTraceSpan()
        s.log.Debug().Int("spans", len(spans)).Msg("ready to save trace spans 
to storage")
-       if len(spans) == 0 {
+       if len(spans) == 0 || s.pipeline == nil {
                return nil
        }
        ctx := context.Background()
diff --git a/banyand/property/service.go b/banyand/property/service.go
index 60d52d0b5..5cb66b011 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -249,6 +249,8 @@ func NewService(metadata metadata.Repo, pipeline 
queue.Server, pipelineClient qu
                pm:       pm,
                closer:   run.NewCloser(0),
 
-               gossipMessenger: gossip.NewMessenger(omr, metadata, 
pipelineClient),
+               gossipMessenger: gossip.NewMessenger("property-repair",
+                       func(n *databasev1.Node) string { return 
n.PropertyRepairGossipGrpcAddress },
+                       omr, metadata, pipelineClient, 17932),
        }, nil
 }
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 1dc98a055..38e764d68 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -168,6 +168,7 @@ func (s *server) PreRun(ctx context.Context) error {
 
                PropertyRepairGossipGrpcAddress: node.PropertyGossipGrpcAddress,
                PropertySchemaGrpcAddress:       node.PropertySchemaGrpcAddress,
+               PropertySchemaGossipGrpcAddress: 
node.PropertySchemaGossipGrpcAddress,
        }
 
        return nil
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index 177e7fd95..e3ecb718a 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -645,7 +645,9 @@ var _ = Describe("Property Cluster background Repair 
Operation", func() {
                node1ID = fmt.Sprintf("127.0.0.1:%s", node1Port)
                node2ID = fmt.Sprintf("127.0.0.1:%s", node2Port)
 
-               messenger = 
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999)
+               messenger = 
gossip.NewMessengerWithoutMetadata("property-repair",
+                       func(n *databasev1.Node) string { return 
n.PropertyRepairGossipGrpcAddress },
+                       observability.NewBypassRegistry(), 9999)
                messenger.Validate()
                err = messenger.PreRun(context.WithValue(context.Background(), 
common.ContextNodeKey, common.Node{
                        NodeID: "not-exist",
@@ -758,6 +760,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data 
Nodes", func() {
                for i := 0; i < nodeCount; i++ {
                        By(fmt.Sprintf("Starting data node %d", i))
                        nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = 
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+                               "--logging-level=debug",
                                "--property-repair-enabled=true", 
"--property-repair-quick-build-tree-time=1s",
                                "--property-repair-build-tree-cron=@every 2s")
                        // Update node ID to use 127.0.0.1
@@ -774,7 +777,9 @@ var _ = Describe("Property Cluster Resilience with 5 Data 
Nodes", func() {
                defUITemplateWithSchema(rootCmd, addr, 1, nodeCount)
 
                // Setup gossip messenger
-               messenger = 
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999)
+               messenger = 
gossip.NewMessengerWithoutMetadata("property-repair",
+                       func(n *databasev1.Node) string { return 
n.PropertyRepairGossipGrpcAddress },
+                       observability.NewBypassRegistry(), 9999)
                messenger.Validate()
                err = messenger.PreRun(context.WithValue(context.Background(), 
common.ContextNodeKey, common.Node{
                        NodeID: "test-client",
@@ -847,6 +852,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data 
Nodes", func() {
                for i := 0; i < closedNodeCount; i++ {
                        GinkgoWriter.Printf("Restarting node %d\n", i)
                        nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = 
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+                               "--logging-level=debug",
                                "--property-repair-enabled=true", 
"--property-repair-quick-build-tree-time=1s",
                                "--property-repair-build-tree-cron=@every 2s")
                        // Update node ID to use 127.0.0.1
@@ -1122,5 +1128,5 @@ func waitForRepairTreeRegeneration(nodeDirs []string, 
group string, beforeTime t
                        }
                }
                return allRegenerated
-       }, flags.EventuallyTimeout).Should(BeTrue(), "All nodes should 
regenerate repair tree after data write")
+       }, time.Minute).Should(BeTrue(), "All nodes should regenerate repair 
tree after data write")
 }
diff --git a/docs/api-reference.md b/docs/api-reference.md
index de05b1c49..650a060bc 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2318,6 +2318,7 @@ Service is the service for the API
 | labels | [Node.LabelsEntry](#banyandb-database-v1-Node-LabelsEntry) | 
repeated | labels is a set of key-value pairs to describe the node. |
 | property_repair_gossip_grpc_address | [string](#string) |  |  |
 | property_schema_grpc_address | [string](#string) |  |  |
+| property_schema_gossip_grpc_address | [string](#string) |  |  |
 
 
 
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 6655f6b37..2403c7c08 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -54,6 +54,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        pm := protector.NewMemory(metricSvc)
        pipeline := sub.NewServer(metricSvc)
        propertyStreamPipeline := queue.Local()
+       metaSvc.SetPropertyPipelineClient(propertyStreamPipeline)
        propertySvc, err := property.NewService(metaSvc, pipeline, 
propertyStreamPipeline, metricSvc, pm)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate property service")
@@ -103,7 +104,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
                Version: version.Build(),
                Short:   "Run as the data server",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
-                       node, err := common.GenerateNode(pipeline.GetPort(), 
nil, propertySvc.GetGossIPGrpcPort(), metaSvc.GetSchemaServerPort())
+                       node, err := common.GenerateNode(pipeline.GetPort(), 
nil, propertySvc.GetGossIPGrpcPort(), metaSvc.GetSchemaServerPort(), 
metaSvc.GetSchemaGossipPort())
                        if err != nil {
                                return err
                        }
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index bf6ce05a8..69756ef83 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -150,7 +150,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                                        sel.SetNodeSelector(ls)
                                }
                        }
-                       node, err := 
common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort(), nil, nil)
+                       node, err := 
common.GenerateNode(internalPipeline.GetPort(), httpServer.GetPort(), nil, nil, 
nil)
                        if err != nil {
                                return err
                        }
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 91a9c36d3..54ec7498d 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -111,7 +111,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
                Version: version.Build(),
                Short:   "Run as the standalone server",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
-                       nodeID, err := 
common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort(), nil, nil)
+                       nodeID, err := 
common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort(), nil, nil, nil)
                        if err != nil {
                                return err
                        }

Reply via email to