This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch etcd in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ad2fbece9f8d473ed8feb0fb487f19225e424236 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Apr 18 13:33:13 2025 +0000 Fix the issue that the etcd watcher gets the historical node registration events Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + Makefile | 2 +- banyand/metadata/schema/etcd.go | 10 ++-- banyand/metadata/schema/schema.go | 2 +- banyand/metadata/schema/watcher_test.go | 85 +++++++++++++++++++++++++++++---- banyand/queue/pub/pub.go | 6 ++- pkg/test/flags/flags.go | 13 +++++ 7 files changed, 100 insertions(+), 19 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a0cecbf9..0da6ead6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,7 @@ Release Notes. ### Bug Fixes - Fix the deadlock issue when loading a closed segment. +- Fix the issue that the etcd watcher gets the historical node registration events. ## 0.8.0 diff --git a/Makefile b/Makefile index 0d6a1747..9a962cae 100644 --- a/Makefile +++ b/Makefile @@ -74,7 +74,7 @@ include scripts/build/ginkgo.mk test-ci: $(GINKGO) ## Run the unit tests in CI $(GINKGO) --race \ -ldflags \ - "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \ + "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.consistentlyTimeout=10s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \ $(TEST_CI_OPTS) \ ./... diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 41f66f1c..eebd6eef 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -172,7 +172,7 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler Eve return } for i := range kinds { - e.registerToWatcher(name, kinds[i], 0, handler) + e.registerToWatcher(name, kinds[i], -1, handler) } } @@ -186,7 +186,7 @@ func (e *etcdSchemaRegistry) registerToWatcher(name string, kind Kind, revision return } e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to a new watcher") - w := e.newWatcherWithRevision(name, kind, revision, CheckInterval(e.checkInterval)) + w := e.NewWatcher(name, kind, revision, CheckInterval(e.checkInterval)) w.AddHandler(handler) e.watchers[kind] = w } @@ -578,11 +578,7 @@ func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { } } -func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, opts ...WatcherOption) *watcher { - return e.newWatcherWithRevision(name, kind, 0, opts...) -} - -func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind, revision int64, opts ...WatcherOption) *watcher { +func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, revision int64, opts ...WatcherOption) *watcher { wc := watcherConfig{ key: e.prependNamespace(kind.key()), kind: kind, diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index 472a34ed..d103536f 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -66,7 +66,7 @@ type Registry interface { Node Property RegisterHandler(string, Kind, EventHandler) - NewWatcher(string, Kind, ...WatcherOption) *watcher + NewWatcher(string, Kind, int64, ...WatcherOption) *watcher Register(context.Context, Metadata, bool) error Compact(context.Context, int64) error StartWatcher() diff --git a/banyand/metadata/schema/watcher_test.go b/banyand/metadata/schema/watcher_test.go index c2fa90dd..20c0b7e1 100644 --- a/banyand/metadata/schema/watcher_test.go +++ b/banyand/metadata/schema/watcher_test.go @@ -81,6 +81,7 @@ var _ = ginkgo.Describe("Watcher", func() { server embeddedetcd.Server registry schema.Registry defFn func() + endpoints []string ) ginkgo.BeforeEach(func() { @@ -98,7 +99,7 @@ var _ = ginkgo.Describe("Watcher", func() { if err != nil { panic("fail to find free ports") } - endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} + endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} server, err = embeddedetcd.NewServer( embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), embeddedetcd.RootDir(path)) @@ -172,7 +173,7 @@ var _ = ginkgo.Describe("Watcher", func() { } // Start the watcher - watcher := registry.NewWatcher("test", schema.KindMeasure) + watcher := registry.NewWatcher("test", schema.KindMeasure, 0) watcher.AddHandler(mockedObj) watcher.Start() ginkgo.DeferCleanup(func() { @@ -191,7 +192,7 @@ var _ = ginkgo.Describe("Watcher", func() { }, flags.EventuallyTimeout).Should(gomega.BeTrue()) }) ginkgo.It("should handle watch events", func() { - watcher := registry.NewWatcher("test", schema.KindStream) + watcher := registry.NewWatcher("test", schema.KindStream, 0) watcher.AddHandler(mockedObj) watcher.Start() ginkgo.DeferCleanup(func() { @@ -316,7 +317,7 @@ var _ = ginkgo.Describe("Watcher", func() { }) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - watcher := registry.NewWatcher("test", schema.KindMeasure, schema.CheckInterval(1*time.Second)) + watcher := registry.NewWatcher("test", schema.KindMeasure, 0, schema.CheckInterval(1*time.Second)) watcher.AddHandler(mockedObj) watcher.Start() ginkgo.DeferCleanup(func() { @@ -331,7 +332,7 @@ var _ = ginkgo.Describe("Watcher", func() { }) ginkgo.It("should detect deletions", func() { - watcher := registry.NewWatcher("test", schema.KindMeasure, schema.CheckInterval(1*time.Second)) + watcher := registry.NewWatcher("test", schema.KindMeasure, 0, schema.CheckInterval(1*time.Second)) watcher.AddHandler(mockedObj) watcher.Start() ginkgo.DeferCleanup(func() { @@ -395,12 +396,12 @@ var _ = ginkgo.Describe("Watcher", func() { gomega.Eventually(func() int { return int(mockedObj.deleteCalledNum.Load()) - }, 5*time.Second).Should(gomega.Equal(1)) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) gomega.Expect(mockedObj.Data()).NotTo(gomega.HaveKey(measureName)) }) ginkgo.It("should recover state after compaction", func() { - watcher := registry.NewWatcher("test", schema.KindMeasure, schema.CheckInterval(1*time.Hour)) + watcher := registry.NewWatcher("test", schema.KindMeasure, 0, schema.CheckInterval(1*time.Hour)) watcher.AddHandler(mockedObj) watcher.Start() ginkgo.DeferCleanup(func() { @@ -487,6 +488,74 @@ var _ = ginkgo.Describe("Watcher", func() { gomega.Eventually(func() int { return int(mockedObj.addOrUpdateCalledNum.Load()) - }, 5*time.Second).Should(gomega.BeNumerically(">=", 2)) + }, flags.EventuallyTimeout).Should(gomega.BeNumerically(">=", 2)) + }) + + ginkgo.It("should not load node with revision -1", func() { + err := registry.RegisterNode(context.Background(), &databasev1.Node{ + Metadata: &commonv1.Metadata{ + Name: "testnode", + }, + Roles: []databasev1.Role{ + databasev1.Role_ROLE_DATA, + }, + }, false) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nn, err := registry.ListNode(context.Background(), databasev1.Role_ROLE_DATA) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(nn)).To(gomega.Equal(1)) + gomega.Expect(nn[0].Metadata.Name).To(gomega.Equal("testnode")) + + err = registry.Close() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // Recreate registry for this test + registry, err = schema.NewEtcdSchemaRegistry( + schema.Namespace("test"), + schema.ConfigureServerEndpoints(endpoints), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + watcher := registry.NewWatcher("test", schema.KindNode, -1, schema.CheckInterval(1*time.Hour)) + watcher.AddHandler(mockedObj) + watcher.Start() + ginkgo.DeferCleanup(func() { + watcher.Close() + }) + gomega.Consistently(func() int { + return int(mockedObj.addOrUpdateCalledNum.Load()) + }, flags.ConsistentlyTimeout).Should(gomega.BeZero()) + }) + + ginkgo.It("should load and delete node with revision 0", func() { + // Register node again for this test + err := registry.RegisterNode(context.Background(), &databasev1.Node{ + Metadata: &commonv1.Metadata{ + Name: "testnode", + }, + Roles: []databasev1.Role{ + databasev1.Role_ROLE_DATA, + }, + }, false) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = registry.Close() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // Recreate registry for this test + registry, err = schema.NewEtcdSchemaRegistry( + schema.Namespace("test"), + schema.ConfigureServerEndpoints(endpoints), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + watcher := registry.NewWatcher("test", schema.KindNode, 0, schema.CheckInterval(1*time.Hour)) + watcher.AddHandler(mockedObj) + watcher.Start() + ginkgo.DeferCleanup(func() { + watcher.Close() + }) + gomega.Eventually(func() int { + return int(mockedObj.addOrUpdateCalledNum.Load()) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) + gomega.Eventually(func() int { + return int(mockedObj.deleteCalledNum.Load()) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) + gomega.Expect(mockedObj.Data()).To(gomega.BeEmpty()) }) }) diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index d49baeef..8cfc0ea1 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -95,7 +95,9 @@ func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Mes var nodes []*databasev1.Node p.mu.RLock() for k := range p.active { - nodes = append(nodes, p.registered[k]) + if n := p.registered[k]; n != nil { + nodes = append(nodes, n) + } } p.mu.RUnlock() if len(nodes) == 0 { @@ -104,7 +106,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Mes names := make(map[string]struct{}) if len(messages.NodeSelectors()) == 0 { for _, n := range nodes { - names[n.Metadata.Name] = struct{}{} + names[n.Metadata.GetName()] = struct{}{} } } else { for g, sel := range messages.NodeSelectors() { diff --git a/pkg/test/flags/flags.go b/pkg/test/flags/flags.go index 0cbd084f..a870d66f 100644 --- a/pkg/test/flags/flags.go +++ b/pkg/test/flags/flags.go @@ -27,9 +27,14 @@ var ( neverTimeout string + consistentlyTimeout string + // EventuallyTimeout is the timeout of async time cases execution. EventuallyTimeout time.Duration + // ConsistentlyTimeout is the timeout of async time cases execution. + ConsistentlyTimeout time.Duration + // NeverTimeout is the timeout of async time cases execution. NeverTimeout time.Duration @@ -44,6 +49,9 @@ func init() { if neverTimeout == "" { neverTimeout = "2s" } + if consistentlyTimeout == "" { + consistentlyTimeout = "5s" + } d, err := time.ParseDuration(eventuallyTimeout) if err != nil { panic(err) @@ -54,4 +62,9 @@ func init() { panic(err) } NeverTimeout = d + d, err = time.ParseDuration(consistentlyTimeout) + if err != nil { + panic(err) + } + ConsistentlyTimeout = d }