This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch patch in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c67047e65abd8a41ecfed542a429faf3c1d1d574 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Jun 21 13:37:42 2024 +0800 Fix several bugs Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 2 + banyand/metadata/client.go | 2 +- banyand/metadata/embeddedserver/server.go | 4 +- banyand/observability/service.go | 2 +- banyand/queue/pub/client.go | 10 ++-- banyand/stream/query.go | 9 +++- bydbctl/internal/cmd/property.go | 4 +- bydbctl/internal/cmd/property_test.go | 79 +++++++++++++++++++++++++++++++ bydbctl/internal/cmd/root.go | 2 +- pkg/cmdsetup/root.go | 4 +- pkg/index/inverted/inverted.go | 4 ++ pkg/index/inverted/sort.go | 3 ++ 12 files changed, 109 insertions(+), 16 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 52332c17..13a0e2b6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,8 @@ Release Notes. - Fix querying old data points when the data is in a newer part. A version column is introduced to each data point and stored in the timestamp file. - Fix the bug that duplicated data points from different data nodes are returned. - Fix the bug that the data node can't re-register to etcd when the connection is lost. +- Fix memory leak in sorting the stream by the inverted index. +- Fix the wrong array flags parsing in command line. The array flags should be parsed by "StringSlice" instead of "StringArray". ## 0.6.1 diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index d6495738..c60ef7d0 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -78,7 +78,7 @@ func (s *clientService) SchemaRegistry() schema.Registry { func (s *clientService) FlagSet() *run.FlagSet { fs := run.NewFlagSet("metadata") fs.StringVar(&s.namespace, "namespace", DefaultNamespace, "The namespace of the metadata stored in etcd") - fs.StringArrayVar(&s.endpoints, FlagEtcdEndpointsName, []string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints") + fs.StringSliceVar(&s.endpoints, FlagEtcdEndpointsName, []string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints") fs.StringVar(&s.etcdUsername, flagEtcdUsername, "", "A username of etcd") fs.StringVar(&s.etcdPassword, flagEtcdPassword, "", "A password of etcd user") fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted certificate authority") diff --git a/banyand/metadata/embeddedserver/server.go b/banyand/metadata/embeddedserver/server.go index cc0425a3..3fbe9f6c 100644 --- a/banyand/metadata/embeddedserver/server.go +++ b/banyand/metadata/embeddedserver/server.go @@ -48,8 +48,8 @@ func (s *server) Role() databasev1.Role { func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("metadata") fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path of metadata") - fs.StringArrayVar(&s.listenClientURL, "etcd-listen-client-url", []string{"http://localhost:2379"}, "A URL to listen on for client traffic") - fs.StringArrayVar(&s.listenPeerURL, "etcd-listen-peer-url", []string{"http://localhost:2380"}, "A URL to listen on for peer traffic") + fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url", []string{"http://localhost:2379"}, "A URL to listen on for client traffic") + fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url", []string{"http://localhost:2380"}, "A URL to listen on for peer traffic") return fs } diff --git a/banyand/observability/service.go b/banyand/observability/service.go index ba46f2d2..a0b8baf2 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -77,7 +77,7 @@ type metricService struct { func (p *metricService) FlagSet() *run.FlagSet { flagSet := run.NewFlagSet("observability") flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability") - flagSet.StringArrayVar(&p.modes, "observability-modes", []string{"prometheus"}, "modes for observability") + flagSet.StringSliceVar(&p.modes, "observability-modes", []string{"prometheus"}, "modes for observability") return flagSet } diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 1ebeb463..7adc5978 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -115,8 +115,8 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { } p.log.Info().Str("status", p.dump()).Str("node", name).Msg("node is removed from active queue by the new gRPC address updated event") } - p.registered[name] = node } + p.registered[name] = node if _, ok := p.active[name]; ok { return @@ -186,14 +186,14 @@ func (p *pub) OnDelete(md schema.Metadata) { elapsed += backoff p.mu.Lock() defer p.mu.Unlock() - if p.removeNodeIfUnhealthy(md, node, client) { - p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", elapsed).Msg("remove node from active queue by delete event") - return true - } if _, ok := p.registered[name]; ok { // The client has been added back to registered clients map, just return return true } + if p.removeNodeIfUnhealthy(md, node, client) { + p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after", elapsed).Msg("remove node from active queue by delete event") + return true + } return false }() { return diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 2edff01c..c614a392 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -422,7 +422,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pb if err != nil { return nil, err } - elementRefList, sortedRefMap, err := indexSort(s, sqo, tabWrappers, seriesList, filteredRefMap) + elementRefList, sortedRefMap, err := s.indexSort(sqo, tabWrappers, seriesList, filteredRefMap) if err != nil { return nil, err } @@ -547,7 +547,7 @@ func indexSearch(ctx context.Context, sqo pbv1.StreamQueryOptions, return filteredRefMap, nil } -func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers []storage.TSTableWrapper[*tsTable], +func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabWrappers []storage.TSTableWrapper[*tsTable], seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64, ) ([]*elementRef, map[common.SeriesID][]int64, error) { if sqo.Order == nil || sqo.Order.Index == nil { @@ -559,6 +559,11 @@ func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers []storage.TST } desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort == modelv1.Sort_SORT_DESC itemIter := itersort.NewItemIter[*index.ItemRef](iters, desc) + defer func() { + if err := itemIter.Close(); err != nil { + s.l.Err(err).Msg("failed to close itemIter") + } + }() var elementRefList []*elementRef sortedRefMap := make(map[common.SeriesID][]int64) diff --git a/bydbctl/internal/cmd/property.go b/bydbctl/internal/cmd/property.go index 3f608725..e8aacbe7 100644 --- a/bydbctl/internal/cmd/property.go +++ b/bydbctl/internal/cmd/property.go @@ -115,8 +115,8 @@ func newPropertyCmd() *cobra.Command { }, } listCmd.Flags().StringVarP(&name, "name", "n", "", "the name of the resource") - listCmd.Flags().StringArrayVarP(&ids, "ids", "", nil, "id selector") - listCmd.Flags().StringArrayVarP(&tags, "tags", "t", nil, "tag selector") + listCmd.Flags().StringSliceVarP(&ids, "ids", "", nil, "id selector") + listCmd.Flags().StringSliceVarP(&tags, "tags", "t", nil, "tag selector") var leaseID int64 keepAliveCmd := &cobra.Command{ diff --git a/bydbctl/internal/cmd/property_test.go b/bydbctl/internal/cmd/property_test.go index 2ad70ab2..fc57c965 100644 --- a/bydbctl/internal/cmd/property_test.go +++ b/bydbctl/internal/cmd/property_test.go @@ -295,6 +295,85 @@ tags: helpers.UnmarshalYAML([]byte(out), resp) Expect(resp.Property).To(HaveLen(2)) }) + + It("list properties in a container by id", func() { + // create another property for list operation + rootCmd.SetArgs([]string{"property", "apply", "-f", "-"}) + rootCmd.SetIn(strings.NewReader(` +metadata: + container: + group: ui-template + name: service + id: spring +tags: + - key: content + value: + str: + value: bar + - key: state + value: + int: + value: 1 +`)) + out := capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + Expect(out).To(ContainSubstring("created: true")) + Expect(out).To(ContainSubstring("tagsNum: 2")) + + rootCmd.SetArgs([]string{"property", "list", "-g", "ui-template", "-n", "service", "--ids", "spring"}) + out = capturer.CaptureStdout(func() { + cmd.ResetFlags() + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + resp := new(propertyv1.ListResponse) + helpers.UnmarshalYAML([]byte(out), resp) + Expect(resp.Property).To(HaveLen(1)) + Expect(resp.Property[0].Metadata.GetId()).To(Equal("spring")) + Expect(resp.Property[0].Tags).To(HaveLen(2)) + }) + + It("list properties in a container by ids and tags", func() { + // create another property for list operation + rootCmd.SetArgs([]string{"property", "apply", "-f", "-"}) + rootCmd.SetIn(strings.NewReader(` +metadata: + container: + group: ui-template + name: service + id: spring +tags: + - key: content + value: + str: + value: bar + - key: state + value: + int: + value: 1 +`)) + out := capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + Expect(out).To(ContainSubstring("created: true")) + Expect(out).To(ContainSubstring("tagsNum: 2")) + + rootCmd.SetArgs([]string{"property", "list", "-g", "ui-template", "-n", "service", "--ids", "spring", "--tags", "content"}) + out = capturer.CaptureStdout(func() { + cmd.ResetFlags() + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + resp := new(propertyv1.ListResponse) + helpers.UnmarshalYAML([]byte(out), resp) + Expect(resp.Property).To(HaveLen(1)) + Expect(resp.Property[0].Metadata.GetId()).To(Equal("spring")) + Expect(resp.Property[0].Tags).To(HaveLen(1)) + }) + It("keepalive not found", func() { rootCmd.SetArgs([]string{ "property", "keepalive", "-i", "111", diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go index f9645fca..5f562281 100644 --- a/bydbctl/internal/cmd/root.go +++ b/bydbctl/internal/cmd/root.go @@ -139,7 +139,7 @@ func bindNameAndIDAndTagsFlag(commands ...*cobra.Command) { bindNameFlag(commands...) for _, c := range commands { c.Flags().StringVarP(&id, "id", "i", "", "the property's id") - c.Flags().StringArrayVarP(&tags, "tags", "t", nil, "the property's tags") + c.Flags().StringSliceVarP(&tags, "tags", "t", nil, "the property's tags") _ = c.MarkFlagRequired("name") _ = c.MarkFlagRequired("id") } diff --git a/pkg/cmdsetup/root.go b/pkg/cmdsetup/root.go index 1e49fbc5..c767ce20 100644 --- a/pkg/cmdsetup/root.go +++ b/pkg/cmdsetup/root.go @@ -62,8 +62,8 @@ BanyanDB, as an observability database, aims to ingest, analyze and store Metric cmd.PersistentFlags().StringVar(&common.FlagNodeHost, "node-host", "", "the node host of the server only used when node-host-provider is \"flag\"") cmd.PersistentFlags().StringVar(&logging.Env, "logging-env", "prod", "the logging") cmd.PersistentFlags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - cmd.PersistentFlags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + cmd.PersistentFlags().StringSliceVar(&logging.Modules, "logging-modules", nil, "the specific module") + cmd.PersistentFlags().StringSliceVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") cmd.AddCommand(newStandaloneCmd(runners...)) cmd.AddCommand(newDataCmd(runners...)) cmd.AddCommand(newLiaisonCmd(runners...)) diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index b1d93d73..95ec0c9c 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -166,6 +166,9 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord bytes.Compare(termRange.Lower, termRange.Upper) > 0 { return index.DummyFieldIterator, nil } + if !s.closer.AddRunning() { + return nil, nil + } reader, err := s.writer.Reader() if err != nil { @@ -211,6 +214,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord sortedKey: sortedKey, size: preLoadSize, sid: fieldKey.SeriesID, + closer: s.closer, } return result, nil } diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 0447e255..e4d6854e 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/run" ) func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err error) { @@ -72,6 +73,7 @@ type sortIterator struct { err error reader *bluge.Reader current *blugeMatchIterator + closer *run.Closer sortedKey string size int skipped int @@ -132,6 +134,7 @@ func (si *sortIterator) Val() *index.ItemRef { } func (si *sortIterator) Close() error { + defer si.closer.Done() if errors.Is(si.err, io.EOF) { si.err = nil if si.current != nil {