This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch etcd-keepalive
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 4423f0860b45825ed98c326bf8c571f85c88fa68
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Jun 20 13:18:35 2024 +0800

    Fix the data node can't re-register to etcd
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                               |   1 +
 banyand/internal/storage/segment.go      |  40 ++++++----
 banyand/internal/storage/version.go      |  28 +++++--
 banyand/metadata/schema/etcd.go          | 125 ++++++++++++++++++++++++-------
 banyand/metadata/schema/etcd_test.go     |  10 +--
 banyand/metadata/schema/register_test.go |  35 ++++++++-
 banyand/metadata/schema/watcher_test.go  |   9 ++-
 pkg/logger/logger.go                     |  11 ++-
 8 files changed, 200 insertions(+), 59 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 89623919..52332c17 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 - Fix the filtering of stream in descending order by timestamp.
 - Fix querying old data points when the data is in a newer part. A version 
column is introduced to each data point and stored in the timestamp file.
 - Fix the bug that duplicated data points from different data nodes are 
returned.
+- Fix the bug that the data node can't re-register to etcd when the connection 
is lost.
 
 ## 0.6.1
 
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 3ad6d984..7c70e448 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -19,8 +19,8 @@ package storage
 
 import (
        "context"
-       "errors"
        "fmt"
+       "io/fs"
        "path"
        "path/filepath"
        "sort"
@@ -29,8 +29,11 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/pkg/errors"
+
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/bucket"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -209,25 +212,36 @@ func (sc *segmentController[T, O]) Parse(value string) 
(time.Time, error) {
 func (sc *segmentController[T, O]) open() error {
        sc.Lock()
        defer sc.Unlock()
-       return loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize, 
func(start, end time.Time) error {
-               compatibleVersions, err := readCompatibleVersions()
-               if err != nil {
-                       return err
-               }
+       emptySegments := make([]string, 0)
+       err := loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize, 
func(start, end time.Time) error {
                suffix := sc.Format(start)
-               metadataPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix), metadataFilename)
+               segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
+               metadataPath := path.Join(segmentPath, metadataFilename)
                version, err := lfs.Read(metadataPath)
                if err != nil {
+                       if errors.Is(err, fs.ErrNotExist) {
+                               emptySegments = append(emptySegments, 
segmentPath)
+                               return nil
+                       }
                        return err
                }
-               for _, cv := range compatibleVersions[compatibleVersionsKey] {
-                       if string(version) == cv {
-                               _, err := sc.load(start, end, sc.location)
-                               return err
-                       }
+               if len(version) == 0 {
+                       emptySegments = append(emptySegments, segmentPath)
+                       return nil
+               }
+               if err = checkVersion(convert.BytesToString(version)); err != 
nil {
+                       return err
                }
-               return errVersionIncompatible
+               _, err = sc.load(start, end, sc.location)
+               return err
        })
+       if len(emptySegments) > 0 {
+               sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments 
found, removing them.")
+               for i := range emptySegments {
+                       lfs.MustRMAll(emptySegments[i])
+               }
+       }
+       return err
 }
 
 func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], 
error) {
diff --git a/banyand/internal/storage/version.go 
b/banyand/internal/storage/version.go
index e1cbbd06..ddae25c4 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -20,8 +20,9 @@ package storage
 import (
        "embed"
        "encoding/json"
-       "errors"
+       "strings"
 
+       "github.com/pkg/errors"
        "sigs.k8s.io/yaml"
 )
 
@@ -34,21 +35,36 @@ const (
 
 var errVersionIncompatible = errors.New("version not compatible")
 
+var compatibleVersions = readCompatibleVersions()
+
 //go:embed versions.yml
 var versionFS embed.FS
 
-func readCompatibleVersions() (map[string][]string, error) {
+func checkVersion(version string) error {
+       for _, v := range compatibleVersions {
+               if v == version {
+                       return nil
+               }
+       }
+       return errors.WithMessagef(errVersionIncompatible, "incompatible 
version %s, supported versions: %s", version, strings.Join(compatibleVersions, 
", "))
+}
+
+func readCompatibleVersions() []string {
        i, err := versionFS.ReadFile(compatibleVersionsFilename)
        if err != nil {
-               return nil, err
+               panic(err)
        }
        j, err := yaml.YAMLToJSON(i)
        if err != nil {
-               return nil, err
+               panic(err)
        }
        var compatibleVersions map[string][]string
        if err := json.Unmarshal(j, &compatibleVersions); err != nil {
-               return nil, err
+               panic(err)
+       }
+       vv, ok := compatibleVersions[compatibleVersionsKey]
+       if !ok {
+               panic("versions not found")
        }
-       return compatibleVersions, nil
+       return vv
 }
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 249df6e2..38b5615a 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -380,82 +380,151 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, 
metadata Metadata) (boo
        return false, nil
 }
 
+const leaseDuration = 5 * time.Second
+
 func (e *etcdSchemaRegistry) Register(ctx context.Context, metadata Metadata, 
forced bool) error {
        if !e.closer.AddRunning() {
                return ErrClosed
        }
        defer e.closer.Done()
-       key, err := metadata.key()
+
+       key, err := e.prepareKey(metadata)
        if err != nil {
                return err
        }
-       key = e.prependNamespace(key)
-       val, err := proto.Marshal(metadata.Spec.(proto.Message))
+
+       val, err := e.prepareValue(metadata)
        if err != nil {
                return err
        }
-       // Create a lease with a short TTL
-       lease, err := e.client.Grant(ctx, 5) // 5 seconds
+
+       lease, err := e.client.Grant(ctx, int64(leaseDuration.Seconds()))
        if err != nil {
+               return fmt.Errorf("failed to grant lease for key %s: %w", key, 
err)
+       }
+
+       if err := e.putKeyVal(ctx, key, val, lease, forced); err != nil {
                return err
        }
+
+       //nolint:contextcheck
+       if err := e.keepLeaseAlive(lease, key, val); err != nil {
+               return fmt.Errorf("failed to keep lease alive for key %s: %w", 
key, err)
+       }
+
+       return nil
+}
+
+func (e *etcdSchemaRegistry) prepareKey(metadata Metadata) (string, error) {
+       key, err := metadata.key()
+       if err != nil {
+               return "", err
+       }
+       return e.prependNamespace(key), nil
+}
+
+func (e *etcdSchemaRegistry) prepareValue(metadata Metadata) (string, error) {
+       val, err := proto.Marshal(metadata.Spec.(proto.Message))
+       if err != nil {
+               return "", err
+       }
+       return string(val), nil
+}
+
+func (e *etcdSchemaRegistry) putKeyVal(ctx context.Context, key, val string, 
lease *clientv3.LeaseGrantResponse, forced bool) error {
        if forced {
-               if _, err = e.client.Put(ctx, key, string(val), 
clientv3.WithLease(lease.ID)); err != nil {
-                       return err
+               if _, err := e.client.Put(ctx, key, val, 
clientv3.WithLease(lease.ID)); err != nil {
+                       return fmt.Errorf("failed to forcefully put key-value 
pair for key %s: %w", key, err)
                }
        } else {
-               var ops []clientv3.Cmp
-               ops = append(ops, 
clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
+               ops := 
[]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision(key), "=", 0)}
                txn := e.client.Txn(ctx).If(ops...)
-               txn = txn.Then(clientv3.OpPut(key, string(val), 
clientv3.WithLease(lease.ID)))
+               txn = txn.Then(clientv3.OpPut(key, val, 
clientv3.WithLease(lease.ID)))
                txn = txn.Else(clientv3.OpGet(key))
-               response, errCommit := txn.Commit()
-               if errCommit != nil {
-                       return errCommit
+               response, err := txn.Commit()
+               if err != nil {
+                       return fmt.Errorf("failed to commit transaction for key 
%s: %w", key, err)
                }
-
                if !response.Succeeded {
                        tr := pb.TxnResponse(*response)
-                       return errors.Wrapf(ErrGRPCAlreadyExists, "response: 
%s", tr.String())
+                       return errors.Wrapf(ErrGRPCAlreadyExists, "key %s, 
response: %s", key, tr.String())
                }
        }
+       return nil
+}
 
-       // Keep the lease alive
-       // nolint:contextcheck
+func (e *etcdSchemaRegistry) keepLeaseAlive(lease 
*clientv3.LeaseGrantResponse, key, val string) error {
        keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID)
        if err != nil {
-               return err
+               return fmt.Errorf("failed to keep lease alive for key %s: %w", 
key, err)
        }
-       // nolint:contextcheck
+
        go func() {
                if !e.closer.AddRunning() {
                        return
                }
                defer func() {
-                       e.l.Info().Msgf("revoking lease %d", lease.ID)
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                       _, err = e.client.Lease.Revoke(ctx, lease.ID)
-                       cancel()
-                       if err != nil {
-                               e.l.Error().Err(err).Msgf("failed to revoke 
lease %d", lease.ID)
-                       }
+                       e.revokeLease(lease)
                        e.closer.Done()
                }()
+
                for {
                        select {
                        case <-e.closer.CloseNotify():
                                return
                        case keepAliveResp := <-keepAliveChan:
                                if keepAliveResp == nil {
-                                       // The channel has been closed
-                                       return
+                                       keepAliveChan = 
e.revokeAndReconnectLease(lease, key, val)
                                }
                        }
                }
        }()
+
        return nil
 }
 
+func (e *etcdSchemaRegistry) revokeAndReconnectLease(lease 
*clientv3.LeaseGrantResponse, key, val string) <-chan 
*clientv3.LeaseKeepAliveResponse {
+       for {
+               e.revokeLease(lease)
+               select {
+               case <-e.closer.CloseNotify():
+                       return nil
+               default:
+                       lease, err := e.client.Grant(context.Background(), 
int64(leaseDuration.Seconds()))
+                       if err != nil {
+                               e.l.Error().Err(err).Msg("failed to grant 
lease")
+                               time.Sleep(leaseDuration)
+                               continue
+                       }
+                       _, err = e.client.Put(context.Background(), key, val, 
clientv3.WithLease(lease.ID))
+                       if err != nil {
+                               e.l.Error().Err(err).Msg("failed to put 
key-value pair")
+                               time.Sleep(leaseDuration)
+                               continue
+                       }
+                       keepAliveChan, err := 
e.client.KeepAlive(context.Background(), lease.ID)
+                       if err != nil {
+                               e.l.Error().Err(err).Msg("failed to keep alive")
+                               time.Sleep(leaseDuration)
+                       } else {
+                               return keepAliveChan
+                       }
+               }
+       }
+}
+
+func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) {
+       if lease == nil {
+               return
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), leaseDuration)
+       defer cancel()
+       _, err := e.client.Lease.Revoke(ctx, lease.ID)
+       if err != nil {
+               e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID)
+       }
+}
+
 func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, handler 
watchEventHandler) *watcher {
        return e.newWatcherWithRevision(name, kind, 0, handler)
 }
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index 6bb877ee..d288d8cb 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -21,11 +21,9 @@ import (
        "context"
        "embed"
        "fmt"
-       "os"
        "path"
        "testing"
 
-       "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/encoding/protojson"
@@ -100,11 +98,8 @@ func preloadSchema(e schema.Registry) error {
        return nil
 }
 
-func randomTempDir() string {
-       return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", 
uuid.New().String()))
-}
-
 func initServerAndRegister(t *testing.T) (schema.Registry, func()) {
+       path, defFn := test.Space(require.New(t))
        req := require.New(t)
        ports, err := test.AllocateFreePorts(2)
        if err != nil {
@@ -113,7 +108,7 @@ func initServerAndRegister(t *testing.T) (schema.Registry, 
func()) {
        endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[0])}
        server, err := embeddedetcd.NewServer(
                embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(randomTempDir()))
+               embeddedetcd.RootDir(path))
        req.NoError(err)
        req.NotNil(server)
        <-server.ReadyNotify()
@@ -124,6 +119,7 @@ func initServerAndRegister(t *testing.T) (schema.Registry, 
func()) {
                server.Close()
                <-server.StopNotify()
                schemaRegistry.Close()
+               defFn()
        }
 }
 
diff --git a/banyand/metadata/schema/register_test.go 
b/banyand/metadata/schema/register_test.go
index 8e373cfc..1e9ceef8 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -20,6 +20,8 @@ package schema_test
 import (
        "context"
        "fmt"
+       "os"
+       "time"
 
        "github.com/onsi/ginkgo/v2"
        "github.com/onsi/gomega"
@@ -34,7 +36,9 @@ import (
 )
 
 var _ = ginkgo.Describe("etcd_register", func() {
-       var endpoints []string
+       var path string
+       var defFn func()
+       var endpoints, peers []string
        var goods []gleak.Goroutine
        var server embeddedetcd.Server
        var r schema.Registry
@@ -51,13 +55,17 @@ var _ = ginkgo.Describe("etcd_register", func() {
                },
        }
        ginkgo.BeforeEach(func() {
+               var err error
+               path, defFn, err = test.NewSpace()
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                goods = gleak.Goroutines()
                ports, err := test.AllocateFreePorts(2)
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
+               peers = []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}
                server, err = embeddedetcd.NewServer(
-                       embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-                       embeddedetcd.RootDir(randomTempDir()))
+                       embeddedetcd.ConfigureListener(endpoints, peers),
+                       embeddedetcd.RootDir(path))
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                <-server.ReadyNotify()
                r, err = schema.NewEtcdSchemaRegistry(
@@ -70,6 +78,7 @@ var _ = ginkgo.Describe("etcd_register", func() {
                gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred())
                server.Close()
                gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               defFn()
        })
 
        ginkgo.It("should revoke the leaser", func() {
@@ -89,4 +98,24 @@ var _ = ginkgo.Describe("etcd_register", func() {
                gomega.Expect(r.Register(context.Background(), md, 
false)).ShouldNot(gomega.HaveOccurred())
                gomega.Expect(r.Register(context.Background(), md, 
false)).Should(gomega.MatchError(schema.ErrGRPCAlreadyExists))
        })
+
+       ginkgo.It("should reconnect", func() {
+               gomega.Expect(r.Register(context.Background(), md, 
true)).ShouldNot(gomega.HaveOccurred())
+               _, err := r.GetNode(context.Background(), node)
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+               gomega.Expect(server.Close()).ShouldNot(gomega.HaveOccurred())
+               time.Sleep(1 * time.Second)
+               os.RemoveAll(path)
+
+               server, err = embeddedetcd.NewServer(
+                       embeddedetcd.ConfigureListener(endpoints, peers),
+                       embeddedetcd.RootDir(path))
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+               <-server.ReadyNotify()
+
+               gomega.Eventually(func() error {
+                       _, err := r.GetNode(context.Background(), node)
+                       return err
+               }, flags.EventuallyTimeout).ShouldNot(gomega.HaveOccurred())
+       })
 })
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index 472f863c..65c45b40 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -79,6 +79,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                mockedObj *mockedHandler
                server    embeddedetcd.Server
                registry  schema.Registry
+               defFn     func()
        )
 
        ginkgo.BeforeEach(func() {
@@ -87,6 +88,11 @@ var _ = ginkgo.Describe("Watcher", func() {
                        Env:   "dev",
                        Level: flags.LogLevel,
                })).To(gomega.Succeed())
+               var path string
+               var err error
+               path, defFn, err = test.NewSpace()
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+
                ports, err := test.AllocateFreePorts(2)
                if err != nil {
                        panic("fail to find free ports")
@@ -94,7 +100,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
                server, err = embeddedetcd.NewServer(
                        embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-                       embeddedetcd.RootDir(randomTempDir()))
+                       embeddedetcd.RootDir(path))
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                <-server.ReadyNotify()
                registry, err = schema.NewEtcdSchemaRegistry(
@@ -107,6 +113,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                registry.Close()
                server.Close()
                <-server.StopNotify()
+               defFn()
        })
 
        ginkgo.It("should handle all existing key-value pairs on initial load", 
func() {
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 9743503f..b967083a 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -96,7 +96,16 @@ func (l *Logger) ToZapConfig() zap.Config {
        }
        if !l.development {
                config := zap.NewProductionConfig()
-               config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
+               switch l.GetLevel() {
+               case zerolog.DebugLevel:
+                       config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
+               case zerolog.InfoLevel:
+                       config.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
+               case zerolog.WarnLevel:
+                       config.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
+               default:
+                       config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
+               }
                return config
        }
        encoderConfig := zapcore.EncoderConfig{

Reply via email to