This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch etcd-keepalive in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 4423f0860b45825ed98c326bf8c571f85c88fa68 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Jun 20 13:18:35 2024 +0800 Fix the data node can't re-register to etcd Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + banyand/internal/storage/segment.go | 40 ++++++---- banyand/internal/storage/version.go | 28 +++++-- banyand/metadata/schema/etcd.go | 125 ++++++++++++++++++++++++------- banyand/metadata/schema/etcd_test.go | 10 +-- banyand/metadata/schema/register_test.go | 35 ++++++++- banyand/metadata/schema/watcher_test.go | 9 ++- pkg/logger/logger.go | 11 ++- 8 files changed, 200 insertions(+), 59 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 89623919..52332c17 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ Release Notes. - Fix the filtering of stream in descending order by timestamp. - Fix querying old data points when the data is in a newer part. A version column is introduced to each data point and stored in the timestamp file. - Fix the bug that duplicated data points from different data nodes are returned. +- Fix the bug that the data node can't re-register to etcd when the connection is lost. ## 0.6.1 diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 3ad6d984..7c70e448 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -19,8 +19,8 @@ package storage import ( "context" - "errors" "fmt" + "io/fs" "path" "path/filepath" "sort" @@ -29,8 +29,11 @@ import ( "sync/atomic" "time" + "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/internal/bucket" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -209,25 +212,36 @@ func (sc *segmentController[T, O]) Parse(value string) (time.Time, error) { func (sc *segmentController[T, O]) open() error { sc.Lock() defer sc.Unlock() - return loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize, func(start, end time.Time) error { - compatibleVersions, err := readCompatibleVersions() - if err != nil { - return err - } + emptySegments := make([]string, 0) + err := loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize, func(start, end time.Time) error { suffix := sc.Format(start) - metadataPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix), metadataFilename) + segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix)) + metadataPath := path.Join(segmentPath, metadataFilename) version, err := lfs.Read(metadataPath) if err != nil { + if errors.Is(err, fs.ErrNotExist) { + emptySegments = append(emptySegments, segmentPath) + return nil + } return err } - for _, cv := range compatibleVersions[compatibleVersionsKey] { - if string(version) == cv { - _, err := sc.load(start, end, sc.location) - return err - } + if len(version) == 0 { + emptySegments = append(emptySegments, segmentPath) + return nil + } + if err = checkVersion(convert.BytesToString(version)); err != nil { + return err } - return errVersionIncompatible + _, err = sc.load(start, end, sc.location) + return err }) + if len(emptySegments) > 0 { + sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments found, removing them.") + for i := range emptySegments { + lfs.MustRMAll(emptySegments[i]) + } + } + return err } func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], error) { diff --git a/banyand/internal/storage/version.go b/banyand/internal/storage/version.go index e1cbbd06..ddae25c4 100644 --- a/banyand/internal/storage/version.go +++ b/banyand/internal/storage/version.go @@ -20,8 +20,9 @@ package storage import ( "embed" "encoding/json" - "errors" + "strings" + "github.com/pkg/errors" "sigs.k8s.io/yaml" ) @@ -34,21 +35,36 @@ const ( var errVersionIncompatible = errors.New("version not compatible") +var compatibleVersions = readCompatibleVersions() + //go:embed versions.yml var versionFS embed.FS -func readCompatibleVersions() (map[string][]string, error) { +func checkVersion(version string) error { + for _, v := range compatibleVersions { + if v == version { + return nil + } + } + return errors.WithMessagef(errVersionIncompatible, "incompatible version %s, supported versions: %s", version, strings.Join(compatibleVersions, ", ")) +} + +func readCompatibleVersions() []string { i, err := versionFS.ReadFile(compatibleVersionsFilename) if err != nil { - return nil, err + panic(err) } j, err := yaml.YAMLToJSON(i) if err != nil { - return nil, err + panic(err) } var compatibleVersions map[string][]string if err := json.Unmarshal(j, &compatibleVersions); err != nil { - return nil, err + panic(err) + } + vv, ok := compatibleVersions[compatibleVersionsKey] + if !ok { + panic("versions not found") } - return compatibleVersions, nil + return vv } diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 249df6e2..38b5615a 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -380,82 +380,151 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (boo return false, nil } +const leaseDuration = 5 * time.Second + func (e *etcdSchemaRegistry) Register(ctx context.Context, metadata Metadata, forced bool) error { if !e.closer.AddRunning() { return ErrClosed } defer e.closer.Done() - key, err := metadata.key() + + key, err := e.prepareKey(metadata) if err != nil { return err } - key = e.prependNamespace(key) - val, err := proto.Marshal(metadata.Spec.(proto.Message)) + + val, err := e.prepareValue(metadata) if err != nil { return err } - // Create a lease with a short TTL - lease, err := e.client.Grant(ctx, 5) // 5 seconds + + lease, err := e.client.Grant(ctx, int64(leaseDuration.Seconds())) if err != nil { + return fmt.Errorf("failed to grant lease for key %s: %w", key, err) + } + + if err := e.putKeyVal(ctx, key, val, lease, forced); err != nil { return err } + + //nolint:contextcheck + if err := e.keepLeaseAlive(lease, key, val); err != nil { + return fmt.Errorf("failed to keep lease alive for key %s: %w", key, err) + } + + return nil +} + +func (e *etcdSchemaRegistry) prepareKey(metadata Metadata) (string, error) { + key, err := metadata.key() + if err != nil { + return "", err + } + return e.prependNamespace(key), nil +} + +func (e *etcdSchemaRegistry) prepareValue(metadata Metadata) (string, error) { + val, err := proto.Marshal(metadata.Spec.(proto.Message)) + if err != nil { + return "", err + } + return string(val), nil +} + +func (e *etcdSchemaRegistry) putKeyVal(ctx context.Context, key, val string, lease *clientv3.LeaseGrantResponse, forced bool) error { if forced { - if _, err = e.client.Put(ctx, key, string(val), clientv3.WithLease(lease.ID)); err != nil { - return err + if _, err := e.client.Put(ctx, key, val, clientv3.WithLease(lease.ID)); err != nil { + return fmt.Errorf("failed to forcefully put key-value pair for key %s: %w", key, err) } } else { - var ops []clientv3.Cmp - ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + ops := []clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision(key), "=", 0)} txn := e.client.Txn(ctx).If(ops...) - txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID))) + txn = txn.Then(clientv3.OpPut(key, val, clientv3.WithLease(lease.ID))) txn = txn.Else(clientv3.OpGet(key)) - response, errCommit := txn.Commit() - if errCommit != nil { - return errCommit + response, err := txn.Commit() + if err != nil { + return fmt.Errorf("failed to commit transaction for key %s: %w", key, err) } - if !response.Succeeded { tr := pb.TxnResponse(*response) - return errors.Wrapf(ErrGRPCAlreadyExists, "response: %s", tr.String()) + return errors.Wrapf(ErrGRPCAlreadyExists, "key %s, response: %s", key, tr.String()) } } + return nil +} - // Keep the lease alive - // nolint:contextcheck +func (e *etcdSchemaRegistry) keepLeaseAlive(lease *clientv3.LeaseGrantResponse, key, val string) error { keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) if err != nil { - return err + return fmt.Errorf("failed to keep lease alive for key %s: %w", key, err) } - // nolint:contextcheck + go func() { if !e.closer.AddRunning() { return } defer func() { - e.l.Info().Msgf("revoking lease %d", lease.ID) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - _, err = e.client.Lease.Revoke(ctx, lease.ID) - cancel() - if err != nil { - e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) - } + e.revokeLease(lease) e.closer.Done() }() + for { select { case <-e.closer.CloseNotify(): return case keepAliveResp := <-keepAliveChan: if keepAliveResp == nil { - // The channel has been closed - return + keepAliveChan = e.revokeAndReconnectLease(lease, key, val) } } } }() + return nil } +func (e *etcdSchemaRegistry) revokeAndReconnectLease(lease *clientv3.LeaseGrantResponse, key, val string) <-chan *clientv3.LeaseKeepAliveResponse { + for { + e.revokeLease(lease) + select { + case <-e.closer.CloseNotify(): + return nil + default: + lease, err := e.client.Grant(context.Background(), int64(leaseDuration.Seconds())) + if err != nil { + e.l.Error().Err(err).Msg("failed to grant lease") + time.Sleep(leaseDuration) + continue + } + _, err = e.client.Put(context.Background(), key, val, clientv3.WithLease(lease.ID)) + if err != nil { + e.l.Error().Err(err).Msg("failed to put key-value pair") + time.Sleep(leaseDuration) + continue + } + keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) + if err != nil { + e.l.Error().Err(err).Msg("failed to keep alive") + time.Sleep(leaseDuration) + } else { + return keepAliveChan + } + } + } +} + +func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { + if lease == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), leaseDuration) + defer cancel() + _, err := e.client.Lease.Revoke(ctx, lease.ID) + if err != nil { + e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) + } +} + func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, handler watchEventHandler) *watcher { return e.newWatcherWithRevision(name, kind, 0, handler) } diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go index 6bb877ee..d288d8cb 100644 --- a/banyand/metadata/schema/etcd_test.go +++ b/banyand/metadata/schema/etcd_test.go @@ -21,11 +21,9 @@ import ( "context" "embed" "fmt" - "os" "path" "testing" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/protojson" @@ -100,11 +98,8 @@ func preloadSchema(e schema.Registry) error { return nil } -func randomTempDir() string { - return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", uuid.New().String())) -} - func initServerAndRegister(t *testing.T) (schema.Registry, func()) { + path, defFn := test.Space(require.New(t)) req := require.New(t) ports, err := test.AllocateFreePorts(2) if err != nil { @@ -113,7 +108,7 @@ func initServerAndRegister(t *testing.T) (schema.Registry, func()) { endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} server, err := embeddedetcd.NewServer( embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), - embeddedetcd.RootDir(randomTempDir())) + embeddedetcd.RootDir(path)) req.NoError(err) req.NotNil(server) <-server.ReadyNotify() @@ -124,6 +119,7 @@ func initServerAndRegister(t *testing.T) (schema.Registry, func()) { server.Close() <-server.StopNotify() schemaRegistry.Close() + defFn() } } diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go index 8e373cfc..1e9ceef8 100644 --- a/banyand/metadata/schema/register_test.go +++ b/banyand/metadata/schema/register_test.go @@ -20,6 +20,8 @@ package schema_test import ( "context" "fmt" + "os" + "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -34,7 +36,9 @@ import ( ) var _ = ginkgo.Describe("etcd_register", func() { - var endpoints []string + var path string + var defFn func() + var endpoints, peers []string var goods []gleak.Goroutine var server embeddedetcd.Server var r schema.Registry @@ -51,13 +55,17 @@ var _ = ginkgo.Describe("etcd_register", func() { }, } ginkgo.BeforeEach(func() { + var err error + path, defFn, err = test.NewSpace() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) goods = gleak.Goroutines() ports, err := test.AllocateFreePorts(2) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} + peers = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])} server, err = embeddedetcd.NewServer( - embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), - embeddedetcd.RootDir(randomTempDir())) + embeddedetcd.ConfigureListener(endpoints, peers), + embeddedetcd.RootDir(path)) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) <-server.ReadyNotify() r, err = schema.NewEtcdSchemaRegistry( @@ -70,6 +78,7 @@ var _ = ginkgo.Describe("etcd_register", func() { gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) server.Close() gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + defFn() }) ginkgo.It("should revoke the leaser", func() { @@ -89,4 +98,24 @@ var _ = ginkgo.Describe("etcd_register", func() { gomega.Expect(r.Register(context.Background(), md, false)).ShouldNot(gomega.HaveOccurred()) gomega.Expect(r.Register(context.Background(), md, false)).Should(gomega.MatchError(schema.ErrGRPCAlreadyExists)) }) + + ginkgo.It("should reconnect", func() { + gomega.Expect(r.Register(context.Background(), md, true)).ShouldNot(gomega.HaveOccurred()) + _, err := r.GetNode(context.Background(), node) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(server.Close()).ShouldNot(gomega.HaveOccurred()) + time.Sleep(1 * time.Second) + os.RemoveAll(path) + + server, err = embeddedetcd.NewServer( + embeddedetcd.ConfigureListener(endpoints, peers), + embeddedetcd.RootDir(path)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + <-server.ReadyNotify() + + gomega.Eventually(func() error { + _, err := r.GetNode(context.Background(), node) + return err + }, flags.EventuallyTimeout).ShouldNot(gomega.HaveOccurred()) + }) }) diff --git a/banyand/metadata/schema/watcher_test.go b/banyand/metadata/schema/watcher_test.go index 472f863c..65c45b40 100644 --- a/banyand/metadata/schema/watcher_test.go +++ b/banyand/metadata/schema/watcher_test.go @@ -79,6 +79,7 @@ var _ = ginkgo.Describe("Watcher", func() { mockedObj *mockedHandler server embeddedetcd.Server registry schema.Registry + defFn func() ) ginkgo.BeforeEach(func() { @@ -87,6 +88,11 @@ var _ = ginkgo.Describe("Watcher", func() { Env: "dev", Level: flags.LogLevel, })).To(gomega.Succeed()) + var path string + var err error + path, defFn, err = test.NewSpace() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + ports, err := test.AllocateFreePorts(2) if err != nil { panic("fail to find free ports") @@ -94,7 +100,7 @@ var _ = ginkgo.Describe("Watcher", func() { endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} server, err = embeddedetcd.NewServer( embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), - embeddedetcd.RootDir(randomTempDir())) + embeddedetcd.RootDir(path)) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) <-server.ReadyNotify() registry, err = schema.NewEtcdSchemaRegistry( @@ -107,6 +113,7 @@ var _ = ginkgo.Describe("Watcher", func() { registry.Close() server.Close() <-server.StopNotify() + defFn() }) ginkgo.It("should handle all existing key-value pairs on initial load", func() { diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 9743503f..b967083a 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -96,7 +96,16 @@ func (l *Logger) ToZapConfig() zap.Config { } if !l.development { config := zap.NewProductionConfig() - config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) + switch l.GetLevel() { + case zerolog.DebugLevel: + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + case zerolog.InfoLevel: + config.Level = zap.NewAtomicLevelAt(zap.InfoLevel) + case zerolog.WarnLevel: + config.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + default: + config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) + } return config } encoderConfig := zapcore.EncoderConfig{