This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch etcd
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ad2fbece9f8d473ed8feb0fb487f19225e424236
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Apr 18 13:33:13 2025 +0000

    Fix the issue that the etcd watcher gets the historical node registration 
events
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                              |  1 +
 Makefile                                |  2 +-
 banyand/metadata/schema/etcd.go         | 10 ++--
 banyand/metadata/schema/schema.go       |  2 +-
 banyand/metadata/schema/watcher_test.go | 85 +++++++++++++++++++++++++++++----
 banyand/queue/pub/pub.go                |  6 ++-
 pkg/test/flags/flags.go                 | 13 +++++
 7 files changed, 100 insertions(+), 19 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a0cecbf9..0da6ead6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
 ### Bug Fixes
 
 - Fix the deadlock issue when loading a closed segment.
+- Fix the issue that the etcd watcher gets the historical node registration 
events.
 
 ## 0.8.0
 
diff --git a/Makefile b/Makefile
index 0d6a1747..9a962cae 100644
--- a/Makefile
+++ b/Makefile
@@ -74,7 +74,7 @@ include scripts/build/ginkgo.mk
 test-ci: $(GINKGO) ## Run the unit tests in CI
        $(GINKGO) --race \
          -ldflags \
-         "-X 
github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X 
github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \
+         "-X 
github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X 
github.com/apache/skywalking-banyandb/pkg/test/flags.consistentlyTimeout=10s -X 
github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \
          $(TEST_CI_OPTS) \
          ./... 
 
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 41f66f1c..eebd6eef 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -172,7 +172,7 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, 
kind Kind, handler Eve
                return
        }
        for i := range kinds {
-               e.registerToWatcher(name, kinds[i], 0, handler)
+               e.registerToWatcher(name, kinds[i], -1, handler)
        }
 }
 
@@ -186,7 +186,7 @@ func (e *etcdSchemaRegistry) registerToWatcher(name string, 
kind Kind, revision
                return
        }
        e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to 
a new watcher")
-       w := e.newWatcherWithRevision(name, kind, revision, 
CheckInterval(e.checkInterval))
+       w := e.NewWatcher(name, kind, revision, CheckInterval(e.checkInterval))
        w.AddHandler(handler)
        e.watchers[kind] = w
 }
@@ -578,11 +578,7 @@ func (e *etcdSchemaRegistry) revokeLease(lease 
*clientv3.LeaseGrantResponse) {
        }
 }
 
-func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, opts 
...WatcherOption) *watcher {
-       return e.newWatcherWithRevision(name, kind, 0, opts...)
-}
-
-func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind, 
revision int64, opts ...WatcherOption) *watcher {
+func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, revision 
int64, opts ...WatcherOption) *watcher {
        wc := watcherConfig{
                key:           e.prependNamespace(kind.key()),
                kind:          kind,
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index 472a34ed..d103536f 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -66,7 +66,7 @@ type Registry interface {
        Node
        Property
        RegisterHandler(string, Kind, EventHandler)
-       NewWatcher(string, Kind, ...WatcherOption) *watcher
+       NewWatcher(string, Kind, int64, ...WatcherOption) *watcher
        Register(context.Context, Metadata, bool) error
        Compact(context.Context, int64) error
        StartWatcher()
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index c2fa90dd..20c0b7e1 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -81,6 +81,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                server    embeddedetcd.Server
                registry  schema.Registry
                defFn     func()
+               endpoints []string
        )
 
        ginkgo.BeforeEach(func() {
@@ -98,7 +99,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                if err != nil {
                        panic("fail to find free ports")
                }
-               endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
+               endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
                server, err = embeddedetcd.NewServer(
                        embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
                        embeddedetcd.RootDir(path))
@@ -172,7 +173,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                }
 
                // Start the watcher
-               watcher := registry.NewWatcher("test", schema.KindMeasure)
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 0)
                watcher.AddHandler(mockedObj)
                watcher.Start()
                ginkgo.DeferCleanup(func() {
@@ -191,7 +192,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                }, flags.EventuallyTimeout).Should(gomega.BeTrue())
        })
        ginkgo.It("should handle watch events", func() {
-               watcher := registry.NewWatcher("test", schema.KindStream)
+               watcher := registry.NewWatcher("test", schema.KindStream, 0)
                watcher.AddHandler(mockedObj)
                watcher.Start()
                ginkgo.DeferCleanup(func() {
@@ -316,7 +317,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                })
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
-               watcher := registry.NewWatcher("test", schema.KindMeasure, 
schema.CheckInterval(1*time.Second))
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 0, 
schema.CheckInterval(1*time.Second))
                watcher.AddHandler(mockedObj)
                watcher.Start()
                ginkgo.DeferCleanup(func() {
@@ -331,7 +332,7 @@ var _ = ginkgo.Describe("Watcher", func() {
        })
 
        ginkgo.It("should detect deletions", func() {
-               watcher := registry.NewWatcher("test", schema.KindMeasure, 
schema.CheckInterval(1*time.Second))
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 0, 
schema.CheckInterval(1*time.Second))
                watcher.AddHandler(mockedObj)
                watcher.Start()
                ginkgo.DeferCleanup(func() {
@@ -395,12 +396,12 @@ var _ = ginkgo.Describe("Watcher", func() {
 
                gomega.Eventually(func() int {
                        return int(mockedObj.deleteCalledNum.Load())
-               }, 5*time.Second).Should(gomega.Equal(1))
+               }, flags.EventuallyTimeout).Should(gomega.Equal(1))
                
gomega.Expect(mockedObj.Data()).NotTo(gomega.HaveKey(measureName))
        })
 
        ginkgo.It("should recover state after compaction", func() {
-               watcher := registry.NewWatcher("test", schema.KindMeasure, 
schema.CheckInterval(1*time.Hour))
+               watcher := registry.NewWatcher("test", schema.KindMeasure, 0, 
schema.CheckInterval(1*time.Hour))
                watcher.AddHandler(mockedObj)
                watcher.Start()
                ginkgo.DeferCleanup(func() {
@@ -487,6 +488,74 @@ var _ = ginkgo.Describe("Watcher", func() {
 
                gomega.Eventually(func() int {
                        return int(mockedObj.addOrUpdateCalledNum.Load())
-               }, 5*time.Second).Should(gomega.BeNumerically(">=", 2))
+               }, flags.EventuallyTimeout).Should(gomega.BeNumerically(">=", 
2))
+       })
+
+       ginkgo.It("should not load node with revision -1", func() {
+               err := registry.RegisterNode(context.Background(), 
&databasev1.Node{
+                       Metadata: &commonv1.Metadata{
+                               Name: "testnode",
+                       },
+                       Roles: []databasev1.Role{
+                               databasev1.Role_ROLE_DATA,
+                       },
+               }, false)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               nn, err := registry.ListNode(context.Background(), 
databasev1.Role_ROLE_DATA)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               gomega.Expect(len(nn)).To(gomega.Equal(1))
+               gomega.Expect(nn[0].Metadata.Name).To(gomega.Equal("testnode"))
+
+               err = registry.Close()
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               // Recreate registry for this test
+               registry, err = schema.NewEtcdSchemaRegistry(
+                       schema.Namespace("test"),
+                       schema.ConfigureServerEndpoints(endpoints),
+               )
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+               watcher := registry.NewWatcher("test", schema.KindNode, -1, 
schema.CheckInterval(1*time.Hour))
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
+               ginkgo.DeferCleanup(func() {
+                       watcher.Close()
+               })
+               gomega.Consistently(func() int {
+                       return int(mockedObj.addOrUpdateCalledNum.Load())
+               }, flags.ConsistentlyTimeout).Should(gomega.BeZero())
+       })
+
+       ginkgo.It("should load and delete node with revision 0", func() {
+               // Register node again for this test
+               err := registry.RegisterNode(context.Background(), 
&databasev1.Node{
+                       Metadata: &commonv1.Metadata{
+                               Name: "testnode",
+                       },
+                       Roles: []databasev1.Role{
+                               databasev1.Role_ROLE_DATA,
+                       },
+               }, false)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               err = registry.Close()
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               // Recreate registry for this test
+               registry, err = schema.NewEtcdSchemaRegistry(
+                       schema.Namespace("test"),
+                       schema.ConfigureServerEndpoints(endpoints),
+               )
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+               watcher := registry.NewWatcher("test", schema.KindNode, 0, 
schema.CheckInterval(1*time.Hour))
+               watcher.AddHandler(mockedObj)
+               watcher.Start()
+               ginkgo.DeferCleanup(func() {
+                       watcher.Close()
+               })
+               gomega.Eventually(func() int {
+                       return int(mockedObj.addOrUpdateCalledNum.Load())
+               }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+               gomega.Eventually(func() int {
+                       return int(mockedObj.deleteCalledNum.Load())
+               }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+               gomega.Expect(mockedObj.Data()).To(gomega.BeEmpty())
        })
 })
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index d49baeef..8cfc0ea1 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -95,7 +95,9 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
        var nodes []*databasev1.Node
        p.mu.RLock()
        for k := range p.active {
-               nodes = append(nodes, p.registered[k])
+               if n := p.registered[k]; n != nil {
+                       nodes = append(nodes, n)
+               }
        }
        p.mu.RUnlock()
        if len(nodes) == 0 {
@@ -104,7 +106,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
        names := make(map[string]struct{})
        if len(messages.NodeSelectors()) == 0 {
                for _, n := range nodes {
-                       names[n.Metadata.Name] = struct{}{}
+                       names[n.Metadata.GetName()] = struct{}{}
                }
        } else {
                for g, sel := range messages.NodeSelectors() {
diff --git a/pkg/test/flags/flags.go b/pkg/test/flags/flags.go
index 0cbd084f..a870d66f 100644
--- a/pkg/test/flags/flags.go
+++ b/pkg/test/flags/flags.go
@@ -27,9 +27,14 @@ var (
 
        neverTimeout string
 
+       consistentlyTimeout string
+
        // EventuallyTimeout is the timeout of async time cases execution.
        EventuallyTimeout time.Duration
 
+       // ConsistentlyTimeout is the timeout of async time cases execution.
+       ConsistentlyTimeout time.Duration
+
        // NeverTimeout is the timeout of async time cases execution.
        NeverTimeout time.Duration
 
@@ -44,6 +49,9 @@ func init() {
        if neverTimeout == "" {
                neverTimeout = "2s"
        }
+       if consistentlyTimeout == "" {
+               consistentlyTimeout = "5s"
+       }
        d, err := time.ParseDuration(eventuallyTimeout)
        if err != nil {
                panic(err)
@@ -54,4 +62,9 @@ func init() {
                panic(err)
        }
        NeverTimeout = d
+       d, err = time.ParseDuration(consistentlyTimeout)
+       if err != nil {
+               panic(err)
+       }
+       ConsistentlyTimeout = d
 }

Reply via email to