Copilot commented on code in PR #927: URL: https://github.com/apache/skywalking-banyandb/pull/927#discussion_r2671985647
########## test/integration/distributed/cluster_state/cluster_state_suite_test.go: ########## @@ -0,0 +1,141 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cluster_state_test + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/setup" +) + +func TestClusterState(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Distributed Get Cluster State Suite") +} + +var ( + dataConnection *grpc.ClientConn + liaisonConnection *grpc.ClientConn + srcDir string + deferFunc func() + goods []gleak.Goroutine + dataAddr string + liaisonAddr string + ep string +) + +var _ = SynchronizedBeforeSuite(func() []byte { + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + goods = gleak.Goroutines() + By("Starting etcd server") + ports, err := test.AllocateFreePorts(2) + Expect(err).NotTo(HaveOccurred()) + var spaceDef func() + srcDir, spaceDef, err = test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ep = fmt.Sprintf("http://127.0.0.1:%d", ports[0]) + server, err := embeddedetcd.NewServer( + embeddedetcd.ConfigureListener([]string{ep}, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(srcDir), + embeddedetcd.AutoCompactionMode("periodic"), + embeddedetcd.AutoCompactionRetention("1h"), + embeddedetcd.QuotaBackendBytes(2*1024*1024*1024), + ) + Expect(err).ShouldNot(HaveOccurred()) + <-server.ReadyNotify() + schemaRegistry, err := schema.NewEtcdSchemaRegistry( + schema.Namespace(metadata.DefaultNamespace), + schema.ConfigureServerEndpoints([]string{ep}), + ) + Expect(err).NotTo(HaveOccurred()) + defer schemaRegistry.Close() + By("Starting data node") + var closeDataNode0 func() + dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, "--property-repair-enabled=true") + By("Starting liaison node") + var closerLiaisonNode func() + liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep) + time.Sleep(flags.ConsistentlyTimeout) + deferFunc = func() { + closerLiaisonNode() + closeDataNode0() + _ = server.Close() + <-server.StopNotify() + spaceDef() + } + liaisonConnection, err = grpchelper.Conn(liaisonAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + Expect(err).NotTo(HaveOccurred()) + dataConnection, err = grpchelper.Conn(dataAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + Expect(err).NotTo(HaveOccurred()) + return nil +}, func(_ []byte) { +}) + +var _ = Describe("ClusterState API", func() { + It("Check cluster state", func() { + client := databasev1.NewClusterStateServiceClient(dataConnection) + state, err := client.GetClusterState(context.Background(), &databasev1.GetClusterStateRequest{}) + Expect(err).NotTo(HaveOccurred()) + Expect(state.GetRouteTables()).To(HaveKey("property")) + client = databasev1.NewClusterStateServiceClient(liaisonConnection) + state, err = client.GetClusterState(context.Background(), &databasev1.GetClusterStateRequest{}) + Expect(err).NotTo(HaveOccurred()) + Expect(state.GetRouteTables()).To(HaveKey("tire1")) + Expect(state.GetRouteTables()).To(HaveKey("tire2")) Review Comment: The expected map keys "tire1" and "tire2" should be "tier1" and "tier2" to match the correct spelling (tier refers to architectural layers/levels). This affects the test expectations and should be updated to align with the corrected spelling in the implementation. ```suggestion Expect(state.GetRouteTables()).To(HaveKey("tier1")) Expect(state.GetRouteTables()).To(HaveKey("tier2")) ``` ########## pkg/cmdsetup/liaison.go: ########## @@ -89,12 +90,17 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate distributed query service") } + routeProviders := map[string]route.TableProvider{ + "tire1": tire1Client, + "tire2": tire2Client, + } Review Comment: The map keys "tire1" and "tire2" should be "tier1" and "tier2" (tier refers to architectural layers/levels, not rubber tires). These keys are used to identify different routing tiers in the liaison node. ########## banyand/backup/lifecycle/cluster_state.go: ########## @@ -0,0 +1,127 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "context" + "sync" + "time" + + "google.golang.org/protobuf/proto" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" +) + +// clusterStateManager manages the aggregated RouteTable snapshot from all lifecycle groups. +type clusterStateManager struct { + lastUpdateTime time.Time + aggregatedTable *databasev1.RouteTable + mu sync.RWMutex +} + +func (m *clusterStateManager) addRouteTable(rt *databasev1.RouteTable) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.aggregatedTable == nil { + m.aggregatedTable = &databasev1.RouteTable{ + Registered: []*databasev1.Node{}, + Active: []string{}, + Evictable: []string{}, + } + } + + // deduplicate registered nodes using map keyed by node name + nodeMap := make(map[string]*databasev1.Node) + for _, node := range m.aggregatedTable.Registered { + if node != nil && node.Metadata != nil { + nodeMap[node.Metadata.Name] = node + } + } + for _, node := range rt.Registered { + if node != nil && node.Metadata != nil { + nodeMap[node.Metadata.Name] = node + } + } + + // deduplicate active node names + activeSet := make(map[string]bool) + for _, name := range m.aggregatedTable.Active { + activeSet[name] = true + } + for _, name := range rt.Active { + activeSet[name] = true + } + + // Deduplicate evictable node names Review Comment: Comment capitalization is inconsistent. Line 62 uses lowercase "deduplicate" while line 71 uses uppercase "Deduplicate". For consistency, both comments should use the same capitalization style (preferably lowercase to match the Go convention for mid-sentence comments). ```suggestion // deduplicate evictable node names ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
