This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e85aeb57 Fix several bugs (#473)
e85aeb57 is described below

commit e85aeb5790a10b8a26db628fc15140a034865009
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Jun 21 14:11:33 2024 +0800

    Fix several bugs (#473)
---
 CHANGES.md                                |  2 +
 banyand/metadata/client.go                |  2 +-
 banyand/metadata/embeddedserver/server.go |  4 +-
 banyand/observability/service.go          |  2 +-
 banyand/queue/pub/client.go               | 10 ++--
 banyand/stream/query.go                   |  9 +++-
 bydbctl/internal/cmd/property.go          |  4 +-
 bydbctl/internal/cmd/property_test.go     | 79 +++++++++++++++++++++++++++++++
 bydbctl/internal/cmd/root.go              |  2 +-
 pkg/cmdsetup/root.go                      |  4 +-
 pkg/index/inverted/inverted.go            |  4 ++
 pkg/index/inverted/sort.go                |  3 ++
 12 files changed, 109 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 52332c17..13a0e2b6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,8 @@ Release Notes.
 - Fix querying old data points when the data is in a newer part. A version 
column is introduced to each data point and stored in the timestamp file.
 - Fix the bug that duplicated data points from different data nodes are 
returned.
 - Fix the bug that the data node can't re-register to etcd when the connection 
is lost.
+- Fix memory leak in sorting the stream by the inverted index.
+- Fix the wrong array flags parsing in command line. The array flags should be 
parsed by "StringSlice" instead of "StringArray".
 
 ## 0.6.1
 
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index d6495738..c60ef7d0 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -78,7 +78,7 @@ func (s *clientService) SchemaRegistry() schema.Registry {
 func (s *clientService) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
        fs.StringVar(&s.namespace, "namespace", DefaultNamespace, "The 
namespace of the metadata stored in etcd")
-       fs.StringArrayVar(&s.endpoints, FlagEtcdEndpointsName, 
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
+       fs.StringSliceVar(&s.endpoints, FlagEtcdEndpointsName, 
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
        fs.StringVar(&s.etcdUsername, flagEtcdUsername, "", "A username of 
etcd")
        fs.StringVar(&s.etcdPassword, flagEtcdPassword, "", "A password of etcd 
user")
        fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted 
certificate authority")
diff --git a/banyand/metadata/embeddedserver/server.go 
b/banyand/metadata/embeddedserver/server.go
index cc0425a3..3fbe9f6c 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -48,8 +48,8 @@ func (s *server) Role() databasev1.Role {
 func (s *server) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
        fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path 
of metadata")
-       fs.StringArrayVar(&s.listenClientURL, "etcd-listen-client-url", 
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
-       fs.StringArrayVar(&s.listenPeerURL, "etcd-listen-peer-url", 
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
+       fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url", 
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
+       fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url", 
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
        return fs
 }
 
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index ba46f2d2..a0b8baf2 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -77,7 +77,7 @@ type metricService struct {
 func (p *metricService) FlagSet() *run.FlagSet {
        flagSet := run.NewFlagSet("observability")
        flagSet.StringVar(&p.listenAddr, "observability-listener-addr", 
":2121", "listen addr for observability")
-       flagSet.StringArrayVar(&p.modes, "observability-modes", 
[]string{"prometheus"}, "modes for observability")
+       flagSet.StringSliceVar(&p.modes, "observability-modes", 
[]string{"prometheus"}, "modes for observability")
        return flagSet
 }
 
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 1ebeb463..7adc5978 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -115,8 +115,8 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
                        }
                        p.log.Info().Str("status", p.dump()).Str("node", 
name).Msg("node is removed from active queue by the new gRPC address updated 
event")
                }
-               p.registered[name] = node
        }
+       p.registered[name] = node
 
        if _, ok := p.active[name]; ok {
                return
@@ -186,14 +186,14 @@ func (p *pub) OnDelete(md schema.Metadata) {
                                                elapsed += backoff
                                                p.mu.Lock()
                                                defer p.mu.Unlock()
-                                               if p.removeNodeIfUnhealthy(md, 
node, client) {
-                                                       
p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", 
elapsed).Msg("remove node from active queue by delete event")
-                                                       return true
-                                               }
                                                if _, ok := p.registered[name]; 
ok {
                                                        // The client has been 
added back to registered clients map, just return
                                                        return true
                                                }
+                                               if p.removeNodeIfUnhealthy(md, 
node, client) {
+                                                       
p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", 
elapsed).Msg("remove node from active queue by delete event")
+                                                       return true
+                                               }
                                                return false
                                        }() {
                                                return
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 2edff01c..c614a392 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -422,7 +422,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (sqr pb
        if err != nil {
                return nil, err
        }
-       elementRefList, sortedRefMap, err := indexSort(s, sqo, tabWrappers, 
seriesList, filteredRefMap)
+       elementRefList, sortedRefMap, err := s.indexSort(sqo, tabWrappers, 
seriesList, filteredRefMap)
        if err != nil {
                return nil, err
        }
@@ -547,7 +547,7 @@ func indexSearch(ctx context.Context, sqo 
pbv1.StreamQueryOptions,
        return filteredRefMap, nil
 }
 
-func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers 
[]storage.TSTableWrapper[*tsTable],
+func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabWrappers 
[]storage.TSTableWrapper[*tsTable],
        seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64,
 ) ([]*elementRef, map[common.SeriesID][]int64, error) {
        if sqo.Order == nil || sqo.Order.Index == nil {
@@ -559,6 +559,11 @@ func indexSort(s *stream, sqo pbv1.StreamQueryOptions, 
tabWrappers []storage.TST
        }
        desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort == 
modelv1.Sort_SORT_DESC
        itemIter := itersort.NewItemIter[*index.ItemRef](iters, desc)
+       defer func() {
+               if err := itemIter.Close(); err != nil {
+                       s.l.Err(err).Msg("failed to close itemIter")
+               }
+       }()
 
        var elementRefList []*elementRef
        sortedRefMap := make(map[common.SeriesID][]int64)
diff --git a/bydbctl/internal/cmd/property.go b/bydbctl/internal/cmd/property.go
index 3f608725..e8aacbe7 100644
--- a/bydbctl/internal/cmd/property.go
+++ b/bydbctl/internal/cmd/property.go
@@ -115,8 +115,8 @@ func newPropertyCmd() *cobra.Command {
                },
        }
        listCmd.Flags().StringVarP(&name, "name", "n", "", "the name of the 
resource")
-       listCmd.Flags().StringArrayVarP(&ids, "ids", "", nil, "id selector")
-       listCmd.Flags().StringArrayVarP(&tags, "tags", "t", nil, "tag selector")
+       listCmd.Flags().StringSliceVarP(&ids, "ids", "", nil, "id selector")
+       listCmd.Flags().StringSliceVarP(&tags, "tags", "t", nil, "tag selector")
 
        var leaseID int64
        keepAliveCmd := &cobra.Command{
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index 2ad70ab2..fc57c965 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -295,6 +295,85 @@ tags:
                helpers.UnmarshalYAML([]byte(out), resp)
                Expect(resp.Property).To(HaveLen(2))
        })
+
+       It("list properties in a container by id", func() {
+               // create another property for list operation
+               rootCmd.SetArgs([]string{"property", "apply", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  container:
+    group: ui-template 
+    name: service
+  id: spring
+tags:
+  - key: content
+    value:
+      str:
+        value: bar
+  - key: state
+    value:
+      int:
+        value: 1
+`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("created: true"))
+               Expect(out).To(ContainSubstring("tagsNum: 2"))
+
+               rootCmd.SetArgs([]string{"property", "list", "-g", 
"ui-template", "-n", "service", "--ids", "spring"})
+               out = capturer.CaptureStdout(func() {
+                       cmd.ResetFlags()
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(propertyv1.ListResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Property).To(HaveLen(1))
+               Expect(resp.Property[0].Metadata.GetId()).To(Equal("spring"))
+               Expect(resp.Property[0].Tags).To(HaveLen(2))
+       })
+
+       It("list properties in a container by ids and tags", func() {
+               // create another property for list operation
+               rootCmd.SetArgs([]string{"property", "apply", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  container:
+    group: ui-template 
+    name: service
+  id: spring
+tags:
+  - key: content
+    value:
+      str:
+        value: bar
+  - key: state
+    value:
+      int:
+        value: 1
+`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("created: true"))
+               Expect(out).To(ContainSubstring("tagsNum: 2"))
+
+               rootCmd.SetArgs([]string{"property", "list", "-g", 
"ui-template", "-n", "service", "--ids", "spring", "--tags", "content"})
+               out = capturer.CaptureStdout(func() {
+                       cmd.ResetFlags()
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(propertyv1.ListResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Property).To(HaveLen(1))
+               Expect(resp.Property[0].Metadata.GetId()).To(Equal("spring"))
+               Expect(resp.Property[0].Tags).To(HaveLen(1))
+       })
+
        It("keepalive not found", func() {
                rootCmd.SetArgs([]string{
                        "property", "keepalive", "-i", "111",
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index f9645fca..5f562281 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -139,7 +139,7 @@ func bindNameAndIDAndTagsFlag(commands ...*cobra.Command) {
        bindNameFlag(commands...)
        for _, c := range commands {
                c.Flags().StringVarP(&id, "id", "i", "", "the property's id")
-               c.Flags().StringArrayVarP(&tags, "tags", "t", nil, "the 
property's tags")
+               c.Flags().StringSliceVarP(&tags, "tags", "t", nil, "the 
property's tags")
                _ = c.MarkFlagRequired("name")
                _ = c.MarkFlagRequired("id")
        }
diff --git a/pkg/cmdsetup/root.go b/pkg/cmdsetup/root.go
index 1e49fbc5..c767ce20 100644
--- a/pkg/cmdsetup/root.go
+++ b/pkg/cmdsetup/root.go
@@ -62,8 +62,8 @@ BanyanDB, as an observability database, aims to ingest, 
analyze and store Metric
        cmd.PersistentFlags().StringVar(&common.FlagNodeHost, "node-host", "", 
"the node host of the server only used when node-host-provider is \"flag\"")
        cmd.PersistentFlags().StringVar(&logging.Env, "logging-env", "prod", 
"the logging")
        cmd.PersistentFlags().StringVar(&logging.Level, "logging-level", 
"info", "the root level of logging")
-       cmd.PersistentFlags().StringArrayVar(&logging.Modules, 
"logging-modules", nil, "the specific module")
-       cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
+       cmd.PersistentFlags().StringSliceVar(&logging.Modules, 
"logging-modules", nil, "the specific module")
+       cmd.PersistentFlags().StringSliceVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
        cmd.AddCommand(newStandaloneCmd(runners...))
        cmd.AddCommand(newDataCmd(runners...))
        cmd.AddCommand(newLiaisonCmd(runners...))
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index b1d93d73..95ec0c9c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -166,6 +166,9 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
                bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
                return index.DummyFieldIterator, nil
        }
+       if !s.closer.AddRunning() {
+               return nil, nil
+       }
 
        reader, err := s.writer.Reader()
        if err != nil {
@@ -211,6 +214,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
                sortedKey: sortedKey,
                size:      preLoadSize,
                sid:       fieldKey.SeriesID,
+               closer:    s.closer,
        }
        return result, nil
 }
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index 0447e255..e4d6854e 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order 
modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err 
error) {
@@ -72,6 +73,7 @@ type sortIterator struct {
        err       error
        reader    *bluge.Reader
        current   *blugeMatchIterator
+       closer    *run.Closer
        sortedKey string
        size      int
        skipped   int
@@ -132,6 +134,7 @@ func (si *sortIterator) Val() *index.ItemRef {
 }
 
 func (si *sortIterator) Close() error {
+       defer si.closer.Done()
        if errors.Is(si.err, io.EOF) {
                si.err = nil
                if si.current != nil {

Reply via email to