This is an automated email from the ASF dual-hosted git repository. butterbright pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new e9765061 Check Unregistered Nodes in Background (#466) e9765061 is described below commit e9765061fe251b7c23bc1b715c56341f8ff3cd39 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Jun 11 09:22:27 2024 +0800 Check Unregistered Nodes in Background (#466) * Check unregisterred nodes in background Signed-off-by: Gao Hongtao <hanahm...@gmail.com> * update changes.md Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --------- Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 4 ++ banyand/queue/pub/client.go | 118 +++++++++++++++++++++++++++------- banyand/queue/pub/client_test.go | 28 ++++++-- banyand/queue/pub/pub.go | 5 +- test/stress/istio/istio_suite_test.go | 1 + 5 files changed, 125 insertions(+), 31 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index de217bb2..731c2316 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,10 @@ Release Notes. ## 0.7.0 +### Features + +- Check unregistered nodes in background. + ### Bugs - Fix the filtering of stream in descending order by timestamp. diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index b82dbc67..1ebeb463 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -97,9 +97,27 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.mu.Lock() defer p.mu.Unlock() - p.registered[name] = struct{}{} + if n, ok := p.registered[name]; ok { + if n.GrpcAddress == address { + return + } + if en, ok := p.evictable[name]; ok { + close(en.c) + delete(p.evictable, name) + p.log.Info().Str("node", name).Str("status", p.dump()).Msg("node is removed from evict queue by the new gRPC address updated event") + } + + if client, ok := p.active[name]; ok { + _ = client.conn.Close() + delete(p.active, name) + if p.handler != nil { + p.handler.OnDelete(client.md) + } + p.log.Info().Str("status", p.dump()).Str("node", name).Msg("node is removed from active queue by the new gRPC address updated event") + } + p.registered[name] = node + } - // If the client already exists, just return if _, ok := p.active[name]; ok { return } @@ -112,7 +130,7 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { return } - if !p.checkClient(conn, md) { + if !p.checkClientHealthAndReconnect(conn, md) { p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is unhealthy, move it to evict queue") return } @@ -149,17 +167,64 @@ func (p *pub) OnDelete(md schema.Metadata) { return } - if client, ok := p.active[name]; ok && !p.healthCheck(node, client.conn) { - _ = client.conn.Close() - delete(p.active, name) - if p.handler != nil { - p.handler.OnDelete(md) + if client, ok := p.active[name]; ok { + if p.removeNodeIfUnhealthy(md, node, client) { + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("remove node from active queue by delete event") + return + } + if !p.closer.AddRunning() { + return } - p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from active queue by delete event") + go func() { + defer p.closer.Done() + backoff := initBackoff + var elapsed time.Duration + for { + select { + case <-time.After(backoff): + if func() bool { + elapsed += backoff + p.mu.Lock() + defer p.mu.Unlock() + if p.removeNodeIfUnhealthy(md, node, client) { + p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", elapsed).Msg("remove node from active queue by delete event") + return true + } + if _, ok := p.registered[name]; ok { + // The client has been added back to registered clients map, just return + return true + } + return false + }() { + return + } + case <-p.closer.CloseNotify(): + return + } + if backoff < maxBackoff { + backoff *= time.Duration(backoffMultiplier) + } else { + backoff = maxBackoff + } + } + }() + } +} + +func (p *pub) removeNodeIfUnhealthy(md schema.Metadata, node *databasev1.Node, client *client) bool { + if p.healthCheck(node, client.conn) { + return false + } + _ = client.conn.Close() + name := node.Metadata.GetName() + delete(p.active, name) + if p.handler != nil { + p.handler.OnDelete(md) } + return true } -func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { +func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md schema.Metadata) bool { node, ok := md.Spec.(*databasev1.Node) if !ok { logger.Panicf("failed to cast node spec") @@ -185,20 +250,21 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { case <-time.After(backoff): connEvict, errEvict := grpc.Dial(node.GrpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) if errEvict == nil && p.healthCheck(en.n, connEvict) { - p.mu.Lock() - defer p.mu.Unlock() - if _, ok := p.evictable[name]; !ok { - // The client has been removed from evict clients map, just return - return - } - c := clusterv1.NewServiceClient(connEvict) - p.active[name] = &client{conn: connEvict, client: c, md: md} - if p.handler != nil { - p.handler.OnAddOrUpdate(md) - } - delete(p.evictable, name) - p.log.Info().Stringer("node", en.n).Msg("node is healthy, move it back to active queue") - return + func() { + p.mu.Lock() + defer p.mu.Unlock() + if _, ok := p.evictable[name]; !ok { + // The client has been removed from evict clients map, just return + return + } + c := clusterv1.NewServiceClient(connEvict) + p.active[name] = &client{conn: connEvict, client: c, md: md} + if p.handler != nil { + p.handler.OnAddOrUpdate(md) + } + delete(p.evictable, name) + p.log.Info().Stringer("node", en.n).Msg("node is healthy, move it back to active queue") + }() } if errEvict != nil { _ = connEvict.Close() @@ -209,6 +275,8 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { p.log.Error().Err(errEvict).Msgf("failed to re-connect to grpc server after waiting for %s", backoff) case <-en.c: return + case <-p.closer.CloseNotify(): + return } if backoff < maxBackoff { backoff *= time.Duration(backoffMultiplier) @@ -250,7 +318,7 @@ func (p *pub) failover(node string) { return } - if client, ok := p.active[node]; ok && !p.checkClient(client.conn, client.md) { + if client, ok := p.active[node]; ok && !p.checkClientHealthAndReconnect(client.conn, client.md) { _ = client.conn.Close() delete(p.active, node) if p.handler != nil { diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go index 2ced4841..f44c54f4 100644 --- a/banyand/queue/pub/client_test.go +++ b/banyand/queue/pub/client_test.go @@ -92,14 +92,34 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { p.OnDelete(node1) verifyClients(p, 0, 0, 1, 2) }) + + ginkgo.It("should be removed eventually", func() { + addr1 := getAddress() + node1 := getDataNode("node1", addr1) + p := newPub() + defer p.GracefulStop() + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + p.OnAddOrUpdate(node1) + verifyClients(p, 1, 0, 1, 0) + p.OnDelete(node1) + verifyClients(p, 1, 0, 1, 0) + closeFn() + gomega.Eventually(func(g gomega.Gomega) { + verifyClientsWithGomega(g, p, 0, 0, 1, 1) + }, flags.EventuallyTimeout).Should(gomega.Succeed()) + }) }) func verifyClients(p *pub, active, evict, onAdd, onDelete int) { + verifyClientsWithGomega(gomega.Default, p, active, evict, onAdd, onDelete) +} + +func verifyClientsWithGomega(g gomega.Gomega, p *pub, active, evict, onAdd, onDelete int) { p.mu.RLock() defer p.mu.RUnlock() - gomega.Expect(p.active).Should(gomega.HaveLen(active)) - gomega.Expect(p.evictable).Should(gomega.HaveLen(evict)) + g.Expect(len(p.active)).Should(gomega.Equal(active)) + g.Expect(len(p.evictable)).Should(gomega.Equal(evict)) h := p.handler.(*mockHandler) - gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd)) - gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete)) + g.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd)) + g.Expect(h.deleteCount).Should(gomega.Equal(onDelete)) } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 2acaf6ff..acada3b0 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/api/data" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -52,7 +53,7 @@ type pub struct { metadata metadata.Repo handler schema.EventHandler log *logger.Logger - registered map[string]struct{} + registered map[string]*databasev1.Node active map[string]*client evictable map[string]evictNode closer *run.Closer @@ -189,7 +190,7 @@ func New(metadata metadata.Repo) queue.Client { metadata: metadata, active: make(map[string]*client), evictable: make(map[string]evictNode), - registered: make(map[string]struct{}), + registered: make(map[string]*databasev1.Node), closer: run.NewCloser(1), } } diff --git a/test/stress/istio/istio_suite_test.go b/test/stress/istio/istio_suite_test.go index d77078fc..d57e6bc8 100644 --- a/test/stress/istio/istio_suite_test.go +++ b/test/stress/istio/istio_suite_test.go @@ -76,6 +76,7 @@ var _ = g.Describe("Istio", func() { time.Sleep(time.Minute) closerServerFunc() helpers.PrintDiskUsage(measurePath, 5, 0) + time.Sleep(10 * time.Second) deferFn() }) gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),