This is an automated email from the ASF dual-hosted git repository.

butterbright pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e9765061 Check Unregistered Nodes in Background (#466)
e9765061 is described below

commit e9765061fe251b7c23bc1b715c56341f8ff3cd39
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Jun 11 09:22:27 2024 +0800

    Check Unregistered Nodes in Background (#466)
    
    * Check unregisterred nodes in background
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
    
    * update changes.md
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
    
    ---------
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                            |   4 ++
 banyand/queue/pub/client.go           | 118 +++++++++++++++++++++++++++-------
 banyand/queue/pub/client_test.go      |  28 ++++++--
 banyand/queue/pub/pub.go              |   5 +-
 test/stress/istio/istio_suite_test.go |   1 +
 5 files changed, 125 insertions(+), 31 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index de217bb2..731c2316 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -4,6 +4,10 @@ Release Notes.
 
 ## 0.7.0
 
+### Features
+
+- Check unregistered nodes in background.
+
 ### Bugs
 
 - Fix the filtering of stream in descending order by timestamp.
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index b82dbc67..1ebeb463 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -97,9 +97,27 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        p.mu.Lock()
        defer p.mu.Unlock()
 
-       p.registered[name] = struct{}{}
+       if n, ok := p.registered[name]; ok {
+               if n.GrpcAddress == address {
+                       return
+               }
+               if en, ok := p.evictable[name]; ok {
+                       close(en.c)
+                       delete(p.evictable, name)
+                       p.log.Info().Str("node", name).Str("status", 
p.dump()).Msg("node is removed from evict queue by the new gRPC address updated 
event")
+               }
+
+               if client, ok := p.active[name]; ok {
+                       _ = client.conn.Close()
+                       delete(p.active, name)
+                       if p.handler != nil {
+                               p.handler.OnDelete(client.md)
+                       }
+                       p.log.Info().Str("status", p.dump()).Str("node", 
name).Msg("node is removed from active queue by the new gRPC address updated 
event")
+               }
+               p.registered[name] = node
+       }
 
-       // If the client already exists, just return
        if _, ok := p.active[name]; ok {
                return
        }
@@ -112,7 +130,7 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
                return
        }
 
-       if !p.checkClient(conn, md) {
+       if !p.checkClientHealthAndReconnect(conn, md) {
                p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is unhealthy, move it to evict queue")
                return
        }
@@ -149,17 +167,64 @@ func (p *pub) OnDelete(md schema.Metadata) {
                return
        }
 
-       if client, ok := p.active[name]; ok && !p.healthCheck(node, 
client.conn) {
-               _ = client.conn.Close()
-               delete(p.active, name)
-               if p.handler != nil {
-                       p.handler.OnDelete(md)
+       if client, ok := p.active[name]; ok {
+               if p.removeNodeIfUnhealthy(md, node, client) {
+                       p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("remove node from active queue by delete event")
+                       return
+               }
+               if !p.closer.AddRunning() {
+                       return
                }
-               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is removed from active queue by delete event")
+               go func() {
+                       defer p.closer.Done()
+                       backoff := initBackoff
+                       var elapsed time.Duration
+                       for {
+                               select {
+                               case <-time.After(backoff):
+                                       if func() bool {
+                                               elapsed += backoff
+                                               p.mu.Lock()
+                                               defer p.mu.Unlock()
+                                               if p.removeNodeIfUnhealthy(md, 
node, client) {
+                                                       
p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", 
elapsed).Msg("remove node from active queue by delete event")
+                                                       return true
+                                               }
+                                               if _, ok := p.registered[name]; 
ok {
+                                                       // The client has been 
added back to registered clients map, just return
+                                                       return true
+                                               }
+                                               return false
+                                       }() {
+                                               return
+                                       }
+                               case <-p.closer.CloseNotify():
+                                       return
+                               }
+                               if backoff < maxBackoff {
+                                       backoff *= 
time.Duration(backoffMultiplier)
+                               } else {
+                                       backoff = maxBackoff
+                               }
+                       }
+               }()
+       }
+}
+
+func (p *pub) removeNodeIfUnhealthy(md schema.Metadata, node *databasev1.Node, 
client *client) bool {
+       if p.healthCheck(node, client.conn) {
+               return false
+       }
+       _ = client.conn.Close()
+       name := node.Metadata.GetName()
+       delete(p.active, name)
+       if p.handler != nil {
+               p.handler.OnDelete(md)
        }
+       return true
 }
 
-func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool {
+func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md 
schema.Metadata) bool {
        node, ok := md.Spec.(*databasev1.Node)
        if !ok {
                logger.Panicf("failed to cast node spec")
@@ -185,20 +250,21 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md 
schema.Metadata) bool {
                        case <-time.After(backoff):
                                connEvict, errEvict := 
grpc.Dial(node.GrpcAddress, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
                                if errEvict == nil && p.healthCheck(en.n, 
connEvict) {
-                                       p.mu.Lock()
-                                       defer p.mu.Unlock()
-                                       if _, ok := p.evictable[name]; !ok {
-                                               // The client has been removed 
from evict clients map, just return
-                                               return
-                                       }
-                                       c := 
clusterv1.NewServiceClient(connEvict)
-                                       p.active[name] = &client{conn: 
connEvict, client: c, md: md}
-                                       if p.handler != nil {
-                                               p.handler.OnAddOrUpdate(md)
-                                       }
-                                       delete(p.evictable, name)
-                                       p.log.Info().Stringer("node", 
en.n).Msg("node is healthy, move it back to active queue")
-                                       return
+                                       func() {
+                                               p.mu.Lock()
+                                               defer p.mu.Unlock()
+                                               if _, ok := p.evictable[name]; 
!ok {
+                                                       // The client has been 
removed from evict clients map, just return
+                                                       return
+                                               }
+                                               c := 
clusterv1.NewServiceClient(connEvict)
+                                               p.active[name] = &client{conn: 
connEvict, client: c, md: md}
+                                               if p.handler != nil {
+                                                       
p.handler.OnAddOrUpdate(md)
+                                               }
+                                               delete(p.evictable, name)
+                                               p.log.Info().Stringer("node", 
en.n).Msg("node is healthy, move it back to active queue")
+                                       }()
                                }
                                if errEvict != nil {
                                        _ = connEvict.Close()
@@ -209,6 +275,8 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md 
schema.Metadata) bool {
                                p.log.Error().Err(errEvict).Msgf("failed to 
re-connect to grpc server after waiting for %s", backoff)
                        case <-en.c:
                                return
+                       case <-p.closer.CloseNotify():
+                               return
                        }
                        if backoff < maxBackoff {
                                backoff *= time.Duration(backoffMultiplier)
@@ -250,7 +318,7 @@ func (p *pub) failover(node string) {
                return
        }
 
-       if client, ok := p.active[node]; ok && !p.checkClient(client.conn, 
client.md) {
+       if client, ok := p.active[node]; ok && 
!p.checkClientHealthAndReconnect(client.conn, client.md) {
                _ = client.conn.Close()
                delete(p.active, node)
                if p.handler != nil {
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
index 2ced4841..f44c54f4 100644
--- a/banyand/queue/pub/client_test.go
+++ b/banyand/queue/pub/client_test.go
@@ -92,14 +92,34 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                p.OnDelete(node1)
                verifyClients(p, 0, 0, 1, 2)
        })
+
+       ginkgo.It("should be removed eventually", func() {
+               addr1 := getAddress()
+               node1 := getDataNode("node1", addr1)
+               p := newPub()
+               defer p.GracefulStop()
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 1, 0, 1, 0)
+               p.OnDelete(node1)
+               verifyClients(p, 1, 0, 1, 0)
+               closeFn()
+               gomega.Eventually(func(g gomega.Gomega) {
+                       verifyClientsWithGomega(g, p, 0, 0, 1, 1)
+               }, flags.EventuallyTimeout).Should(gomega.Succeed())
+       })
 })
 
 func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
+       verifyClientsWithGomega(gomega.Default, p, active, evict, onAdd, 
onDelete)
+}
+
+func verifyClientsWithGomega(g gomega.Gomega, p *pub, active, evict, onAdd, 
onDelete int) {
        p.mu.RLock()
        defer p.mu.RUnlock()
-       gomega.Expect(p.active).Should(gomega.HaveLen(active))
-       gomega.Expect(p.evictable).Should(gomega.HaveLen(evict))
+       g.Expect(len(p.active)).Should(gomega.Equal(active))
+       g.Expect(len(p.evictable)).Should(gomega.Equal(evict))
        h := p.handler.(*mockHandler)
-       gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd))
-       gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete))
+       g.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd))
+       g.Expect(h.deleteCount).Should(gomega.Equal(onDelete))
 }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 2acaf6ff..acada3b0 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -34,6 +34,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/data"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -52,7 +53,7 @@ type pub struct {
        metadata   metadata.Repo
        handler    schema.EventHandler
        log        *logger.Logger
-       registered map[string]struct{}
+       registered map[string]*databasev1.Node
        active     map[string]*client
        evictable  map[string]evictNode
        closer     *run.Closer
@@ -189,7 +190,7 @@ func New(metadata metadata.Repo) queue.Client {
                metadata:   metadata,
                active:     make(map[string]*client),
                evictable:  make(map[string]evictNode),
-               registered: make(map[string]struct{}),
+               registered: make(map[string]*databasev1.Node),
                closer:     run.NewCloser(1),
        }
 }
diff --git a/test/stress/istio/istio_suite_test.go 
b/test/stress/istio/istio_suite_test.go
index d77078fc..d57e6bc8 100644
--- a/test/stress/istio/istio_suite_test.go
+++ b/test/stress/istio/istio_suite_test.go
@@ -76,6 +76,7 @@ var _ = g.Describe("Istio", func() {
                        time.Sleep(time.Minute)
                        closerServerFunc()
                        helpers.PrintDiskUsage(measurePath, 5, 0)
+                       time.Sleep(10 * time.Second)
                        deferFn()
                })
                gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),

Reply via email to