Copilot commented on code in PR #927:
URL: 
https://github.com/apache/skywalking-banyandb/pull/927#discussion_r2671545649


##########
pkg/cmdsetup/liaison.go:
##########
@@ -89,12 +90,17 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
        }
 
+       routeProviders := map[string]route.TableProvider{
+               "tire1": tire1Client,
+               "tire2": tire2Client,

Review Comment:
   The word "tire" should be spelled "tier". This appears to be a typo 
referring to architectural tiers (tier 1, tier 2), not tires.



##########
test/integration/distributed/cluster_state/cluster_state_suite_test.go:
##########
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cluster_state_test
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestClusterState(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Get Cluster State Suite")
+}
+
+var (
+       dataConnection    *grpc.ClientConn
+       liaisonConnection *grpc.ClientConn
+       srcDir            string
+       deferFunc         func()
+       goods             []gleak.Goroutine
+       dataAddr          string
+       liaisonAddr       string
+       ep                string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       goods = gleak.Goroutines()
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       var spaceDef func()
+       srcDir, spaceDef, err = test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep = fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(srcDir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+       By("Starting data node")
+       var closeDataNode0 func()
+       dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, 
"--property-repair-enabled=true")
+       By("Starting liaison node")
+       var closerLiaisonNode func()
+       liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep)
+       time.Sleep(flags.ConsistentlyTimeout)
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       liaisonConnection, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+       dataConnection, err = grpchelper.Conn(dataAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+       return nil
+}, func(_ []byte) {
+})
+
+var _ = Describe("ClusterState API", func() {
+       It("Check cluster state", func() {
+               client := 
databasev1.NewClusterStateServiceClient(dataConnection)
+               state, err := client.GetClusterState(context.Background(), 
&databasev1.GetClusterStateRequest{})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(state.GetRouteTables()).To(HaveKey("property"))
+               client = 
databasev1.NewClusterStateServiceClient(liaisonConnection)
+               state, err = client.GetClusterState(context.Background(), 
&databasev1.GetClusterStateRequest{})
+               Expect(err).NotTo(HaveOccurred())
+               Expect(state.GetRouteTables()).To(HaveKey("tire1"))
+               Expect(state.GetRouteTables()).To(HaveKey("tire2"))

Review Comment:
   The word "tire" should be spelled "tier". This appears to be a typo 
referring to architectural tiers (tier 1, tier 2), not tires.
   ```suggestion
                Expect(state.GetRouteTables()).To(HaveKey("tier1"))
                Expect(state.GetRouteTables()).To(HaveKey("tier2"))
   ```



##########
docs/api-reference.md:
##########
@@ -2636,6 +2641,47 @@ Type determine the index structure under the hood
 
 
 
+<a name="banyandb-database-v1-GetClusterStateRequest"></a>
+
+### GetClusterStateRequest
+
+
+
+
+
+
+
+<a name="banyandb-database-v1-GetClusterStateResponse"></a>
+
+### GetClusterStateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| route_tables | 
[GetClusterStateResponse.RouteTablesEntry](#banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry)
 | repeated | Liaison node: map&#39;s key could be &#34;tire1&#34; and 
&#34;tire2&#34;. tire1 route traffic between liaison nodes, tire2 spread data 
among data nodes Data node: map&#39;s key could be &#34;property&#34; for 
gossip. Lifecycle agent: map&#39;s key could be the next stage&#39;s name. |

Review Comment:
   The comment has spelling and grammar issues. "tire1" and "tire2" should be 
"tier1" and "tier2". Additionally, "tire1 route traffic" should be "tier1 
routes traffic" (verb should have 's'), and "tire2 spread data" should be 
"tier2 spreads data" (verb should have 's').
   ```suggestion
   | route_tables | 
[GetClusterStateResponse.RouteTablesEntry](#banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry)
 | repeated | Liaison node: map&#39;s key could be &#34;tier1&#34; and 
&#34;tier2&#34;. tier1 routes traffic between liaison nodes, tier2 spreads data 
among data nodes. Data node: map&#39;s key could be &#34;property&#34; for 
gossip. Lifecycle agent: map&#39;s key could be the next stage&#39;s name. |
   ```



##########
api/proto/banyandb/database/v1/rpc.proto:
##########
@@ -684,3 +684,29 @@ message GetCurrentNodeResponse {
 service NodeQueryService {
   rpc GetCurrentNode(GetCurrentNodeRequest) returns (GetCurrentNodeResponse) {}
 }
+
+message GetClusterStateRequest {}
+
+// RouteTable represents a collection of nodes grouped by their health state.
+// It provides a view of nodes that are registered, actively healthy, and 
those being evicted.
+message RouteTable {
+  // registered contains all nodes that have been discovered and registered in 
this route.
+  repeated banyandb.database.v1.Node registered = 1;
+  // active contains node names (Node.Metadata.Name) that are currently 
healthy and can handle requests.
+  repeated string active = 2;
+  // evictable contains node names (Node.Metadata.Name) that are unhealthy and 
being retried before eviction.
+  repeated string evictable = 3;
+}
+
+message GetClusterStateResponse {
+  // Liaison node: map's key could be "tire1" and "tire2". tire1 route traffic 
between liaison nodes, tire2 spread data among data nodes

Review Comment:
   The comment has a grammar issue. "tire1 route traffic" should be "tier1 
routes traffic" (using the verb "routes" with an 's'), and "tire2 spread data" 
should be "tier2 spreads data" (using the verb "spreads" with an 's'). This 
applies to both instances where "tire" should be "tier".
   ```suggestion
     // Liaison node: map's key could be "tier1" and "tier2". tier1 routes 
traffic between liaison nodes, tier2 spreads data among data nodes
   ```



##########
banyand/backup/lifecycle/service.go:
##########
@@ -152,11 +265,125 @@ func (l *lifecycleService) Serve() run.StopNotify {
        })
        if err != nil {
                l.l.Error().Err(err).Msg("failed to register lifecycle 
migration schedule")
+               close(done)
                return done
        }
+
+       // Wait for either migration completion or server stop
+       go func() {
+               select {
+               case <-done:
+                       // Migration completed
+               case <-l.stopCh:
+                       // Server stopped
+                       close(done)
+               }
+       }()

Review Comment:
   There's a potential race condition where the 'done' channel could be closed 
multiple times. If the scheduler reaches max execution times (line 261) or 
registration fails (line 268), 'done' is closed. Then, if the stopCh is 
signaled, the goroutine at line 279 will attempt to close 'done' again, causing 
a panic. Consider using a sync.Once to ensure the channel is only closed once, 
or restructure the logic to avoid this scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to