This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch index-term in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7733284fbb67845aa7057fb6be360d4a0119199b Author: Gao Hongtao <[email protected]> AuthorDate: Fri Jul 29 14:13:10 2022 +0000 Remove term metadata store Signed-off-by: Gao Hongtao <[email protected]> --- banyand/query/processor.go | 2 +- banyand/tsdb/indexdb.go | 6 +-- pkg/index/index.go | 34 +--------------- pkg/index/inverted/inverted.go | 29 ++++---------- pkg/index/inverted/mem.go | 23 +++++------ pkg/index/iterator.go | 17 ++++---- pkg/index/lsm/lsm.go | 31 ++++----------- pkg/index/lsm/search.go | 6 +-- pkg/index/metadata/metadata.go | 79 ------------------------------------- pkg/index/testcases/service_name.go | 2 +- scripts/build/test.mk | 6 +-- 11 files changed, 45 insertions(+), 190 deletions(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 95a449c..cb7b01f 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -67,7 +67,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { p.log.Warn().Msg("invalid event data type") return } - p.log.Info().Msg("received a query event") + p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query request") meta := queryCriteria.GetMetadata() ec, err := p.streamService.Stream(meta) diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go index 4c8b958..261b39e 100644 --- a/banyand/tsdb/indexdb.go +++ b/banyand/tsdb/indexdb.go @@ -58,7 +58,7 @@ type indexDB struct { func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) { result := make([]GlobalItemID, 0) - f, err := field.MarshalStraight() + f, err := field.Marshal() if err != nil { return nil, err } @@ -154,7 +154,7 @@ func (i *indexWriter) WriteLSMIndex(field index.Field) error { if i.scope != nil { field.Key.SeriesID = GlobalSeriesID(i.scope) } - key, err := field.MarshalStraight() + key, err := field.Marshal() if err != nil { return err } @@ -165,7 +165,7 @@ func (i *indexWriter) WriteInvertedIndex(field index.Field) error { if i.scope != nil { field.Key.SeriesID = GlobalSeriesID(i.scope) } - key, err := field.MarshalStraight() + key, err := field.Marshal() if err != nil { return err } diff --git a/pkg/index/index.go b/pkg/index/index.go index ebedc32..192a94f 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -27,7 +27,6 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/index/metadata" "github.com/apache/skywalking-banyandb/pkg/index/posting" ) @@ -68,40 +67,11 @@ type Field struct { Term []byte } -func (f Field) MarshalStraight() ([]byte, error) { +func (f Field) Marshal() ([]byte, error) { return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil } -func (f Field) Marshal(term metadata.Term) ([]byte, error) { - var t []byte - if f.Key.EncodeTerm { - var err error - t, err = term.ID(f.Term) - if err != nil { - return nil, errors.Wrap(err, "get term id") - } - f.Term = t - } - return f.MarshalStraight() -} - -func (f *Field) Unmarshal(term metadata.Term, raw []byte) error { - err := f.UnmarshalStraight(raw) - if err != nil { - return err - } - if !f.Key.EncodeTerm { - return nil - } - t, err := term.Literal(f.Term) - if err != nil { - return errors.Wrap(err, "get term literal from metadata store") - } - f.Term = t - return nil -} - -func (f *Field) UnmarshalStraight(raw []byte) error { +func (f *Field) Unmarshal(raw []byte) error { fk := &f.Key err := fk.Unmarshal(raw[:len(raw)-8]) if err != nil { diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 8073d19..c126a6b 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -29,7 +29,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/metadata" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -38,7 +37,6 @@ import ( var _ index.Store = (*store)(nil) type store struct { - termMetadata metadata.Term diskTable kv.IndexStore memTable *memTable immutableMemTable *memTable @@ -57,23 +55,15 @@ func NewStore(opts StoreOpts) (index.Store, error) { if err != nil { return nil, err } - var md metadata.Term - if md, err = metadata.NewTerm(metadata.TermOpts{ - Path: opts.Path + "/tmd", - Logger: opts.Logger, - }); err != nil { - return nil, err - } return &store{ - memTable: newMemTable(), - diskTable: diskTable, - termMetadata: md, - l: opts.Logger, + memTable: newMemTable(), + diskTable: diskTable, + l: opts.Logger, }, nil } func (s *store) Close() error { - return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close()) + return s.diskTable.Close() } func (s *store) Write(field index.Field, chunkID common.ItemID) error { @@ -92,7 +82,7 @@ func (s *store) Flush() error { s.memTable = newMemTable() } err := s.diskTable. - Handover(s.immutableMemTable.Iter(s.termMetadata)) + Handover(s.immutableMemTable.Iter()) if err != nil { return err } @@ -104,9 +94,6 @@ func (s *store) Stats() observability.Statistics { stat := s.mainStats() disk := s.diskTable.Stats() stat.MaxMemBytes = disk.MaxMemBytes - term := s.termMetadata.Stats() - stat.MemBytes += term.MemBytes - stat.MaxMemBytes += term.MaxMemBytes return stat } @@ -127,7 +114,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) { } func (s *store) MatchTerms(field index.Field) (posting.List, error) { - f, err := field.Marshal(s.termMetadata) + f, err := field.Marshal() if err != nil { return nil, err } @@ -197,7 +184,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, } iters = append(iters, it) } - it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable, s.termMetadata, + it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.diskTable, func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) { list := roaring.NewPostingList() err := list.Unmarshall(val) @@ -214,7 +201,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, f := index.Field{ Key: fieldKey, } - err := f.Unmarshal(s.termMetadata, delegated.Key()) + err := f.Unmarshal(delegated.Key()) if err != nil { return nil, err } diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go index 4d78f69..1b70b2c 100644 --- a/pkg/index/inverted/mem.go +++ b/pkg/index/inverted/mem.go @@ -28,7 +28,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/metadata" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" ) @@ -147,14 +146,13 @@ func (m *memTable) MatchTerms(field index.Field) (posting.List, error) { var _ kv.Iterator = (*flushIterator)(nil) type flushIterator struct { - fieldIdx int - termIdx int - key []byte - value []byte - fields *fieldMap - valid bool - err error - termMetadata metadata.Term + fieldIdx int + termIdx int + key []byte + value []byte + fields *fieldMap + valid bool + err error } func (i *flushIterator) Next() { @@ -228,7 +226,7 @@ func (i *flushIterator) setCurr() bool { Key: term.key, Term: value.Term, } - i.key, err = f.Marshal(i.termMetadata) + i.key, err = f.Marshal() if err != nil { i.err = multierr.Append(i.err, err) return false @@ -236,9 +234,8 @@ func (i *flushIterator) setCurr() bool { return true } -func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator { +func (m *memTable) Iter() kv.Iterator { return &flushIterator{ - fields: m.fields, - termMetadata: termMetadata, + fields: m.fields, } } diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go index 559802f..fe94b44 100644 --- a/pkg/index/iterator.go +++ b/pkg/index/iterator.go @@ -26,7 +26,6 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/index/metadata" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -89,7 +88,7 @@ func (f *FieldIteratorTemplate) Close() error { } func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, iterable kv.Iterable, - metadata metadata.Term, fn CompositePostingValueFn, + fn CompositePostingValueFn, ) (*FieldIteratorTemplate, error) { if termRange.Upper == nil { termRange.Upper = DefaultUpper @@ -118,12 +117,12 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran Key: fieldKey, Term: term, } - seekKey, err := field.Marshal(metadata) + seekKey, err := field.Marshal() if err != nil { return nil, err } return &FieldIteratorTemplate{ - delegated: newDelegateIterator(iter, fieldKey, metadata, l), + delegated: newDelegateIterator(iter, fieldKey, l), termRange: termRange, fn: fn, reverse: reverse, @@ -131,11 +130,11 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange Ran }, nil } -func parseKey(fieldKey FieldKey, metadata metadata.Term, key []byte) (Field, error) { +func parseKey(fieldKey FieldKey, key []byte) (Field, error) { f := &Field{ Key: fieldKey, } - err := f.Unmarshal(metadata, key) + err := f.Unmarshal(key) if err != nil { return *f, err } @@ -235,20 +234,18 @@ type delegateIterator struct { delegated kv.Iterator fieldKey FieldKey fieldKeyBytes []byte - metadata metadata.Term l *logger.Logger curField Field closed bool } -func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, metadata metadata.Term, l *logger.Logger) *delegateIterator { +func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l *logger.Logger) *delegateIterator { fieldKeyBytes := fieldKey.Marshal() return &delegateIterator{ delegated: delegated, fieldKey: fieldKey, fieldKeyBytes: fieldKeyBytes, - metadata: metadata, l: l, } } @@ -282,7 +279,7 @@ func (di *delegateIterator) Valid() bool { return false } var err error - di.curField, err = parseKey(di.fieldKey, di.metadata, di.Key()) + di.curField, err = parseKey(di.fieldKey, di.Key()) if err != nil { di.l.Error().Err(err).Msg("fail to parse field from key") di.Close() diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go index b2ef18c..53de61b 100644 --- a/pkg/index/lsm/lsm.go +++ b/pkg/index/lsm/lsm.go @@ -18,23 +18,19 @@ package lsm import ( - "go.uber.org/multierr" - "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/metadata" "github.com/apache/skywalking-banyandb/pkg/logger" ) var _ index.Store = (*store)(nil) type store struct { - lsm kv.Store - termMetadata metadata.Term - l *logger.Logger + lsm kv.Store + l *logger.Logger } func (*store) Flush() error { @@ -42,20 +38,15 @@ func (*store) Flush() error { } func (s *store) Stats() observability.Statistics { - store := s.lsm.Stats() - term := s.termMetadata.Stats() - return observability.Statistics{ - MemBytes: store.MemBytes + term.MemBytes, - MaxMemBytes: store.MaxMemBytes + term.MaxMemBytes, - } + return s.lsm.Stats() } func (s *store) Close() error { - return multierr.Combine(s.lsm.Close(), s.termMetadata.Close()) + return s.lsm.Close() } func (s *store) Write(field index.Field, itemID common.ItemID) error { - f, err := field.Marshal(s.termMetadata) + f, err := field.Marshal() if err != nil { return err } @@ -74,16 +65,8 @@ func NewStore(opts StoreOpts) (index.Store, error) { if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil { return nil, err } - var md metadata.Term - if md, err = metadata.NewTerm(metadata.TermOpts{ - Path: opts.Path + "/tmd", - Logger: opts.Logger, - }); err != nil { - return nil, err - } return &store{ - lsm: lsm, - termMetadata: md, - l: opts.Logger, + lsm: lsm, + l: opts.Logger, }, nil } diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go index 43f1948..5e877fa 100644 --- a/pkg/index/lsm/search.go +++ b/pkg/index/lsm/search.go @@ -37,7 +37,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err erro } func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { - f, err := field.Marshal(s.termMetadata) + f, err := field.Marshal() if err != nil { return nil, err } @@ -66,7 +66,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti } func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) { - return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, s.termMetadata, + return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, s.lsm, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) { pv := &index.PostingValue{ Term: term, @@ -75,7 +75,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord for ; delegated.Valid(); delegated.Next() { f := index.Field{} - err := f.Unmarshal(s.termMetadata, delegated.Key()) + err := f.Unmarshal(delegated.Key()) if err != nil { return nil, err } diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go deleted file mode 100644 index 3e150e0..0000000 --- a/pkg/index/metadata/metadata.go +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package metadata - -import ( - "io" - - "github.com/pkg/errors" - - "github.com/apache/skywalking-banyandb/banyand/kv" - "github.com/apache/skywalking-banyandb/banyand/observability" - "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/logger" -) - -type Term interface { - observability.Observable - ID(term []byte) (id []byte, err error) - Literal(id []byte) (term []byte, err error) - io.Closer -} - -var _ Term = (*term)(nil) - -type term struct { - store kv.Store -} - -type TermOpts struct { - Path string - Logger *logger.Logger -} - -func NewTerm(opts TermOpts) (Term, error) { - var store kv.Store - var err error - if store, err = kv.OpenStore(0, opts.Path, kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil { - return nil, err - } - return &term{ - store: store, - }, nil -} - -func (t *term) ID(term []byte) (id []byte, err error) { - id = convert.Uint64ToBytes(convert.Hash(term)) - _, err = t.store.Get(id) - if errors.Is(err, kv.ErrKeyNotFound) { - return id, t.store.Put(id, term) - } - return id, nil -} - -func (t *term) Literal(id []byte) (term []byte, err error) { - return t.store.Get(id) -} - -func (t *term) Close() error { - return t.store.Close() -} - -func (t *term) Stats() observability.Statistics { - return t.store.Stats() -} diff --git a/pkg/index/testcases/service_name.go b/pkg/index/testcases/service_name.go index c6e174d..df5e067 100644 --- a/pkg/index/testcases/service_name.go +++ b/pkg/index/testcases/service_name.go @@ -31,7 +31,7 @@ import ( var serviceName = index.FieldKey{ // http_method IndexRuleID: 6, - EncodeTerm: true, + EncodeTerm: false, } func RunServiceName(t *testing.T, store SimpleStore) { diff --git a/scripts/build/test.mk b/scripts/build/test.mk index 8a2bdf8..ad65bdf 100644 --- a/scripts/build/test.mk +++ b/scripts/build/test.mk @@ -41,13 +41,13 @@ test: $(GINKGO) generate ## Run all the unit tests $(GINKGO) $(TEST_OPTS) $(TEST_EXTRA_OPTS) -tags "$(TEST_TAGS)" $(TEST_PKG_LIST) .PHONY: test-race -test-race: TEST_OPTS=-race +test-race: TEST_OPTS=--race test-race: test ## Run all the unit tests with race detector on .PHONY: test-coverage -test-coverage: ## Run all the unit tests with coverage analysis on +test-coverage: $(GINKGO) generate ## Run all the unit tests with coverage analysis on mkdir -p "$(TEST_COVERAGE_DIR)" - go test $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) -coverprofile="$(TEST_COVERAGE_PROFILE)" -tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST) + $(GINKGO) $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) --coverprofile="$(TEST_COVERAGE_PROFILE)" --tags "$(TEST_TAGS)" $(TEST_COVERAGE_PKG_LIST) go tool cover -html="$(TEST_COVERAGE_PROFILE)" -o "$(TEST_COVERAGE_REPORT)" @echo "Test coverage report has been saved to $(TEST_COVERAGE_REPORT)"
