This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e87c5f  Remove term metadata store (#148)
8e87c5f is described below

commit 8e87c5ff9cad1cc91fa8690cf0fc85981bbfef9d
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jul 29 23:48:56 2022 +0800

    Remove term metadata store (#148)
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/query/processor.go          |  2 +-
 banyand/tsdb/indexdb.go             |  6 +--
 pkg/index/index.go                  | 34 +---------------
 pkg/index/inverted/inverted.go      | 29 ++++----------
 pkg/index/inverted/mem.go           | 23 +++++------
 pkg/index/iterator.go               | 17 ++++----
 pkg/index/lsm/lsm.go                | 31 ++++-----------
 pkg/index/lsm/search.go             |  6 +--
 pkg/index/metadata/metadata.go      | 79 -------------------------------------
 pkg/index/testcases/service_name.go |  2 +-
 scripts/build/test.mk               |  6 +--
 11 files changed, 45 insertions(+), 190 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 95a449c..cb7b01f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -67,7 +67,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                p.log.Warn().Msg("invalid event data type")
                return
        }
-       p.log.Info().Msg("received a query event")
+       p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query 
request")
 
        meta := queryCriteria.GetMetadata()
        ec, err := p.streamService.Stream(meta)
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 4c8b958..261b39e 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -58,7 +58,7 @@ type indexDB struct {
 
 func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
        result := make([]GlobalItemID, 0)
-       f, err := field.MarshalStraight()
+       f, err := field.Marshal()
        if err != nil {
                return nil, err
        }
@@ -154,7 +154,7 @@ func (i *indexWriter) WriteLSMIndex(field index.Field) 
error {
        if i.scope != nil {
                field.Key.SeriesID = GlobalSeriesID(i.scope)
        }
-       key, err := field.MarshalStraight()
+       key, err := field.Marshal()
        if err != nil {
                return err
        }
@@ -165,7 +165,7 @@ func (i *indexWriter) WriteInvertedIndex(field index.Field) 
error {
        if i.scope != nil {
                field.Key.SeriesID = GlobalSeriesID(i.scope)
        }
-       key, err := field.MarshalStraight()
+       key, err := field.Marshal()
        if err != nil {
                return err
        }
diff --git a/pkg/index/index.go b/pkg/index/index.go
index ebedc32..192a94f 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -27,7 +27,6 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/index/metadata"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
 
@@ -68,40 +67,11 @@ type Field struct {
        Term []byte
 }
 
-func (f Field) MarshalStraight() ([]byte, error) {
+func (f Field) Marshal() ([]byte, error) {
        return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
 }
 
-func (f Field) Marshal(term metadata.Term) ([]byte, error) {
-       var t []byte
-       if f.Key.EncodeTerm {
-               var err error
-               t, err = term.ID(f.Term)
-               if err != nil {
-                       return nil, errors.Wrap(err, "get term id")
-               }
-               f.Term = t
-       }
-       return f.MarshalStraight()
-}
-
-func (f *Field) Unmarshal(term metadata.Term, raw []byte) error {
-       err := f.UnmarshalStraight(raw)
-       if err != nil {
-               return err
-       }
-       if !f.Key.EncodeTerm {
-               return nil
-       }
-       t, err := term.Literal(f.Term)
-       if err != nil {
-               return errors.Wrap(err, "get term literal from metadata store")
-       }
-       f.Term = t
-       return nil
-}
-
-func (f *Field) UnmarshalStraight(raw []byte) error {
+func (f *Field) Unmarshal(raw []byte) error {
        fk := &f.Key
        err := fk.Unmarshal(raw[:len(raw)-8])
        if err != nil {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8073d19..c126a6b 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -29,7 +29,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/metadata"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -38,7 +37,6 @@ import (
 var _ index.Store = (*store)(nil)
 
 type store struct {
-       termMetadata      metadata.Term
        diskTable         kv.IndexStore
        memTable          *memTable
        immutableMemTable *memTable
@@ -57,23 +55,15 @@ func NewStore(opts StoreOpts) (index.Store, error) {
        if err != nil {
                return nil, err
        }
-       var md metadata.Term
-       if md, err = metadata.NewTerm(metadata.TermOpts{
-               Path:   opts.Path + "/tmd",
-               Logger: opts.Logger,
-       }); err != nil {
-               return nil, err
-       }
        return &store{
-               memTable:     newMemTable(),
-               diskTable:    diskTable,
-               termMetadata: md,
-               l:            opts.Logger,
+               memTable:  newMemTable(),
+               diskTable: diskTable,
+               l:         opts.Logger,
        }, nil
 }
 
 func (s *store) Close() error {
-       return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
+       return s.diskTable.Close()
 }
 
 func (s *store) Write(field index.Field, chunkID common.ItemID) error {
@@ -92,7 +82,7 @@ func (s *store) Flush() error {
                s.memTable = newMemTable()
        }
        err := s.diskTable.
-               Handover(s.immutableMemTable.Iter(s.termMetadata))
+               Handover(s.immutableMemTable.Iter())
        if err != nil {
                return err
        }
@@ -104,9 +94,6 @@ func (s *store) Stats() observability.Statistics {
        stat := s.mainStats()
        disk := s.diskTable.Stats()
        stat.MaxMemBytes = disk.MaxMemBytes
-       term := s.termMetadata.Stats()
-       stat.MemBytes += term.MemBytes
-       stat.MaxMemBytes += term.MaxMemBytes
        return stat
 }
 
@@ -127,7 +114,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) 
(posting.List, error) {
 }
 
 func (s *store) MatchTerms(field index.Field) (posting.List, error) {
-       f, err := field.Marshal(s.termMetadata)
+       f, err := field.Marshal()
        if err != nil {
                return nil, err
        }
@@ -197,7 +184,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts,
                }
                iters = append(iters, it)
        }
-       it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, 
order, s.diskTable, s.termMetadata,
+       it, err := index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, 
order, s.diskTable,
                func(term, val []byte, delegated kv.Iterator) 
(*index.PostingValue, error) {
                        list := roaring.NewPostingList()
                        err := list.Unmarshall(val)
@@ -214,7 +201,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts,
                                f := index.Field{
                                        Key: fieldKey,
                                }
-                               err := f.Unmarshal(s.termMetadata, 
delegated.Key())
+                               err := f.Unmarshal(delegated.Key())
                                if err != nil {
                                        return nil, err
                                }
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index 4d78f69..1b70b2c 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/metadata"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
@@ -147,14 +146,13 @@ func (m *memTable) MatchTerms(field index.Field) 
(posting.List, error) {
 var _ kv.Iterator = (*flushIterator)(nil)
 
 type flushIterator struct {
-       fieldIdx     int
-       termIdx      int
-       key          []byte
-       value        []byte
-       fields       *fieldMap
-       valid        bool
-       err          error
-       termMetadata metadata.Term
+       fieldIdx int
+       termIdx  int
+       key      []byte
+       value    []byte
+       fields   *fieldMap
+       valid    bool
+       err      error
 }
 
 func (i *flushIterator) Next() {
@@ -228,7 +226,7 @@ func (i *flushIterator) setCurr() bool {
                Key:  term.key,
                Term: value.Term,
        }
-       i.key, err = f.Marshal(i.termMetadata)
+       i.key, err = f.Marshal()
        if err != nil {
                i.err = multierr.Append(i.err, err)
                return false
@@ -236,9 +234,8 @@ func (i *flushIterator) setCurr() bool {
        return true
 }
 
-func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator {
+func (m *memTable) Iter() kv.Iterator {
        return &flushIterator{
-               fields:       m.fields,
-               termMetadata: termMetadata,
+               fields: m.fields,
        }
 }
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index 559802f..fe94b44 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -26,7 +26,6 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/index/metadata"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -89,7 +88,7 @@ func (f *FieldIteratorTemplate) Close() error {
 }
 
 func NewFieldIteratorTemplate(l *logger.Logger, fieldKey FieldKey, termRange 
RangeOpts, order modelv1.Sort, iterable kv.Iterable,
-       metadata metadata.Term, fn CompositePostingValueFn,
+       fn CompositePostingValueFn,
 ) (*FieldIteratorTemplate, error) {
        if termRange.Upper == nil {
                termRange.Upper = DefaultUpper
@@ -118,12 +117,12 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey 
FieldKey, termRange Ran
                Key:  fieldKey,
                Term: term,
        }
-       seekKey, err := field.Marshal(metadata)
+       seekKey, err := field.Marshal()
        if err != nil {
                return nil, err
        }
        return &FieldIteratorTemplate{
-               delegated: newDelegateIterator(iter, fieldKey, metadata, l),
+               delegated: newDelegateIterator(iter, fieldKey, l),
                termRange: termRange,
                fn:        fn,
                reverse:   reverse,
@@ -131,11 +130,11 @@ func NewFieldIteratorTemplate(l *logger.Logger, fieldKey 
FieldKey, termRange Ran
        }, nil
 }
 
-func parseKey(fieldKey FieldKey, metadata metadata.Term, key []byte) (Field, 
error) {
+func parseKey(fieldKey FieldKey, key []byte) (Field, error) {
        f := &Field{
                Key: fieldKey,
        }
-       err := f.Unmarshal(metadata, key)
+       err := f.Unmarshal(key)
        if err != nil {
                return *f, err
        }
@@ -235,20 +234,18 @@ type delegateIterator struct {
        delegated     kv.Iterator
        fieldKey      FieldKey
        fieldKeyBytes []byte
-       metadata      metadata.Term
        l             *logger.Logger
 
        curField Field
        closed   bool
 }
 
-func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, metadata 
metadata.Term, l *logger.Logger) *delegateIterator {
+func newDelegateIterator(delegated kv.Iterator, fieldKey FieldKey, l 
*logger.Logger) *delegateIterator {
        fieldKeyBytes := fieldKey.Marshal()
        return &delegateIterator{
                delegated:     delegated,
                fieldKey:      fieldKey,
                fieldKeyBytes: fieldKeyBytes,
-               metadata:      metadata,
                l:             l,
        }
 }
@@ -282,7 +279,7 @@ func (di *delegateIterator) Valid() bool {
                return false
        }
        var err error
-       di.curField, err = parseKey(di.fieldKey, di.metadata, di.Key())
+       di.curField, err = parseKey(di.fieldKey, di.Key())
        if err != nil {
                di.l.Error().Err(err).Msg("fail to parse field from key")
                di.Close()
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b2ef18c..53de61b 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,23 +18,19 @@
 package lsm
 
 import (
-       "go.uber.org/multierr"
-
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/index/metadata"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var _ index.Store = (*store)(nil)
 
 type store struct {
-       lsm          kv.Store
-       termMetadata metadata.Term
-       l            *logger.Logger
+       lsm kv.Store
+       l   *logger.Logger
 }
 
 func (*store) Flush() error {
@@ -42,20 +38,15 @@ func (*store) Flush() error {
 }
 
 func (s *store) Stats() observability.Statistics {
-       store := s.lsm.Stats()
-       term := s.termMetadata.Stats()
-       return observability.Statistics{
-               MemBytes:    store.MemBytes + term.MemBytes,
-               MaxMemBytes: store.MaxMemBytes + term.MaxMemBytes,
-       }
+       return s.lsm.Stats()
 }
 
 func (s *store) Close() error {
-       return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
+       return s.lsm.Close()
 }
 
 func (s *store) Write(field index.Field, itemID common.ItemID) error {
-       f, err := field.Marshal(s.termMetadata)
+       f, err := field.Marshal()
        if err != nil {
                return err
        }
@@ -74,16 +65,8 @@ func NewStore(opts StoreOpts) (index.Store, error) {
        if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", 
kv.StoreWithLogger(opts.Logger)); err != nil {
                return nil, err
        }
-       var md metadata.Term
-       if md, err = metadata.NewTerm(metadata.TermOpts{
-               Path:   opts.Path + "/tmd",
-               Logger: opts.Logger,
-       }); err != nil {
-               return nil, err
-       }
        return &store{
-               lsm:          lsm,
-               termMetadata: md,
-               l:            opts.Logger,
+               lsm: lsm,
+               l:   opts.Logger,
        }, nil
 }
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 43f1948..5e877fa 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -37,7 +37,7 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list 
posting.List, err erro
 }
 
 func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
-       f, err := field.Marshal(s.termMetadata)
+       f, err := field.Marshal()
        if err != nil {
                return nil, err
        }
@@ -66,7 +66,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
 }
 
 func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort) (index.FieldIterator, error) {
-       return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, 
s.lsm, s.termMetadata,
+       return index.NewFieldIteratorTemplate(s.l, fieldKey, termRange, order, 
s.lsm,
                func(term, value []byte, delegated kv.Iterator) 
(*index.PostingValue, error) {
                        pv := &index.PostingValue{
                                Term:  term,
@@ -75,7 +75,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
 
                        for ; delegated.Valid(); delegated.Next() {
                                f := index.Field{}
-                               err := f.Unmarshal(s.termMetadata, 
delegated.Key())
+                               err := f.Unmarshal(delegated.Key())
                                if err != nil {
                                        return nil, err
                                }
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
deleted file mode 100644
index 3e150e0..0000000
--- a/pkg/index/metadata/metadata.go
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
-       "io"
-
-       "github.com/pkg/errors"
-
-       "github.com/apache/skywalking-banyandb/banyand/kv"
-       "github.com/apache/skywalking-banyandb/banyand/observability"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-type Term interface {
-       observability.Observable
-       ID(term []byte) (id []byte, err error)
-       Literal(id []byte) (term []byte, err error)
-       io.Closer
-}
-
-var _ Term = (*term)(nil)
-
-type term struct {
-       store kv.Store
-}
-
-type TermOpts struct {
-       Path   string
-       Logger *logger.Logger
-}
-
-func NewTerm(opts TermOpts) (Term, error) {
-       var store kv.Store
-       var err error
-       if store, err = kv.OpenStore(0, opts.Path, 
kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil {
-               return nil, err
-       }
-       return &term{
-               store: store,
-       }, nil
-}
-
-func (t *term) ID(term []byte) (id []byte, err error) {
-       id = convert.Uint64ToBytes(convert.Hash(term))
-       _, err = t.store.Get(id)
-       if errors.Is(err, kv.ErrKeyNotFound) {
-               return id, t.store.Put(id, term)
-       }
-       return id, nil
-}
-
-func (t *term) Literal(id []byte) (term []byte, err error) {
-       return t.store.Get(id)
-}
-
-func (t *term) Close() error {
-       return t.store.Close()
-}
-
-func (t *term) Stats() observability.Statistics {
-       return t.store.Stats()
-}
diff --git a/pkg/index/testcases/service_name.go 
b/pkg/index/testcases/service_name.go
index c6e174d..df5e067 100644
--- a/pkg/index/testcases/service_name.go
+++ b/pkg/index/testcases/service_name.go
@@ -31,7 +31,7 @@ import (
 var serviceName = index.FieldKey{
        // http_method
        IndexRuleID: 6,
-       EncodeTerm:  true,
+       EncodeTerm:  false,
 }
 
 func RunServiceName(t *testing.T, store SimpleStore) {
diff --git a/scripts/build/test.mk b/scripts/build/test.mk
index 8a2bdf8..ad65bdf 100644
--- a/scripts/build/test.mk
+++ b/scripts/build/test.mk
@@ -41,13 +41,13 @@ test: $(GINKGO) generate ## Run all the unit tests
        $(GINKGO) $(TEST_OPTS) $(TEST_EXTRA_OPTS) -tags "$(TEST_TAGS)" 
$(TEST_PKG_LIST)
 
 .PHONY: test-race
-test-race: TEST_OPTS=-race
+test-race: TEST_OPTS=--race
 test-race: test  ## Run all the unit tests with race detector on
 
 .PHONY: test-coverage
-test-coverage: ## Run all the unit tests with coverage analysis on
+test-coverage: $(GINKGO) generate ## Run all the unit tests with coverage 
analysis on
        mkdir -p "$(TEST_COVERAGE_DIR)"
-       go test $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) 
-coverprofile="$(TEST_COVERAGE_PROFILE)" -tags "$(TEST_TAGS)" 
$(TEST_COVERAGE_PKG_LIST)
+       $(GINKGO) $(TEST_COVERAGE_OPTS) $(TEST_COVERAGE_EXTRA_OPTS) 
--coverprofile="$(TEST_COVERAGE_PROFILE)" --tags "$(TEST_TAGS)" 
$(TEST_COVERAGE_PKG_LIST)
        go tool cover -html="$(TEST_COVERAGE_PROFILE)" -o 
"$(TEST_COVERAGE_REPORT)"
        @echo "Test coverage report has been saved to $(TEST_COVERAGE_REPORT)"
 

Reply via email to