This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 005b022 Access the metadata cache in the querying procedure. (#226)
005b022 is described below
commit 005b02210caacee0141de8085edebed367ef5a6f
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Dec 7 15:18:16 2022 +0800
Access the metadata cache in the querying procedure. (#226)
---
banyand/query/processor.go | 23 ++++----------
banyand/stream/stream.go | 4 +++
banyand/stream/stream_query.go | 2 ++
banyand/tsdb/seriesdb.go | 1 -
pkg/query/logical/measure/measure_analyzer.go | 44 ++++++---------------------
pkg/query/logical/measure/schema.go | 4 ---
pkg/query/logical/schema.go | 8 -----
pkg/query/logical/stream/schema.go | 4 ---
pkg/query/logical/stream/stream_analyzer.go | 41 +++++--------------------
9 files changed, 30 insertions(+), 101 deletions(-)
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 6171b1a..20b7bd6 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -52,7 +52,8 @@ var (
)
type queryService struct {
- log *logger.Logger
+ log *logger.Logger
+ // TODO: remove the metaService once
https://github.com/apache/skywalking/issues/10121 is fixed.
metaService metadata.Service
serviceRepo discovery.ServiceRepo
pipeline queue.Queue
@@ -85,19 +86,13 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
return
}
- analyzer, err :=
logical_stream.CreateAnalyzerFromMetaService(p.metaService)
- if err != nil {
- resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build analyzer for stream %s: %v", meta.GetName(), err))
- return
- }
-
- s, err := analyzer.BuildSchema(context.TODO(), meta)
+ s, err := logical_stream.BuildSchema(ec)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build schema for stream %s: %v", meta.GetName(), err))
return
}
- plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
+ plan, err := logical_stream.Analyze(context.TODO(), queryCriteria,
meta, s)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for stream %s: %v", meta.GetName(), err))
return
@@ -143,19 +138,13 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
return
}
- analyzer, err :=
logical_measure.CreateAnalyzerFromMetaService(p.metaService)
- if err != nil {
- resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build analyzer for measure %s: %v", meta.GetName(), err))
- return
- }
-
- s, err := analyzer.BuildSchema(context.TODO(), meta)
+ s, err := logical_measure.BuildSchema(ec)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build schema for measure %s: %v", meta.GetName(), err))
return
}
- plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
+ plan, err := logical_measure.Analyze(context.TODO(), queryCriteria,
meta, s)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for measure %s: %v", meta.GetName(), err))
return
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index c8866cf..711b713 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -54,6 +54,10 @@ func (s *stream) GetMetadata() *commonv1.Metadata {
return s.schema.Metadata
}
+func (s *stream) GetSchema() *databasev1.Stream {
+ return s.schema
+}
+
func (s *stream) GetIndexRules() []*databasev1.IndexRule {
return s.indexRules
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 5b3ef7c..b2af26d 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -45,6 +45,8 @@ type Stream interface {
Shard(id common.ShardID) (tsdb.Shard, error)
ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily,
error)
ParseElementID(item tsdb.Item) (string, error)
+ GetSchema() *databasev1.Stream
+ GetIndexRules() []*databasev1.IndexRule
}
var _ Stream = (*stream)(nil)
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 5f759e4..5b69639 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -329,7 +329,6 @@ func (s *seriesDB) Get(key []byte, entityValues
EntityValues) (Series, error) {
var series string
if e := s.l.Debug(); e.Enabled() {
- // TODO: store following info when the debug is enabled
errDecode =
s.seriesMetadata.Put(prepend(seriesID.Marshal(), seriesPrefix),
entityValuesBytes)
if errDecode != nil {
return nil, errDecode
diff --git a/pkg/query/logical/measure/measure_analyzer.go
b/pkg/query/logical/measure/measure_analyzer.go
index 412362f..e516c23 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -22,57 +22,33 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
-// Analyzer analyzes the measure querying expression to the execution plan.
-type Analyzer struct {
- metadataRepoImpl metadata.Repo
-}
-
-// CreateAnalyzerFromMetaService returns a Analyzer.
-func CreateAnalyzerFromMetaService(metaSvc metadata.Service) (*Analyzer,
error) {
- return &Analyzer{
- metaSvc,
- }, nil
-}
-
// BuildSchema returns Schema loaded from the metadata repository.
-func (a *Analyzer) BuildSchema(ctx context.Context, metadata
*commonv1.Metadata) (logical.Schema, error) {
- group, err := a.metadataRepoImpl.GroupRegistry().GetGroup(ctx,
metadata.GetGroup())
- if err != nil {
- return nil, err
- }
- measure, err := a.metadataRepoImpl.MeasureRegistry().GetMeasure(ctx,
metadata)
- if err != nil {
- return nil, err
- }
-
- indexRules, err := a.metadataRepoImpl.IndexRules(ctx, metadata)
- if err != nil {
- return nil, err
- }
+func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) {
+ md := measureSchema.GetSchema()
+ md.GetEntity()
ms := &schema{
common: &logical.CommonSchema{
- Group: group,
- IndexRules: indexRules,
+ IndexRules: measureSchema.GetIndexRules(),
TagMap: make(map[string]*logical.TagSpec),
- EntityList: measure.GetEntity().GetTagNames(),
+ EntityList: md.GetEntity().GetTagNames(),
},
- measure: measure,
+ measure: md,
fieldMap: make(map[string]*logical.FieldSpec),
}
- for tagFamilyIdx, tagFamily := range measure.GetTagFamilies() {
+ for tagFamilyIdx, tagFamily := range md.GetTagFamilies() {
for tagIdx, spec := range tagFamily.GetTags() {
ms.registerTag(tagFamilyIdx, tagIdx, spec)
}
}
- for fieldIdx, spec := range measure.GetFields() {
+ for fieldIdx, spec := range md.GetFields() {
ms.registerField(fieldIdx, spec)
}
@@ -80,7 +56,7 @@ func (a *Analyzer) BuildSchema(ctx context.Context, metadata
*commonv1.Metadata)
}
// Analyze converts logical expressions to executable operation tree
represented by Plan.
-func (a *Analyzer) Analyze(_ context.Context, criteria
*measurev1.QueryRequest, metadata *commonv1.Metadata, s logical.Schema)
(logical.Plan, error) {
+func Analyze(_ context.Context, criteria *measurev1.QueryRequest, metadata
*commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
groupByEntity := false
var groupByTags [][]*logical.Tag
if criteria.GetGroupBy() != nil {
diff --git a/pkg/query/logical/measure/schema.go
b/pkg/query/logical/measure/schema.go
index 7161a09..8a337aa 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -104,10 +104,6 @@ func (m *schema) Equal(s2 logical.Schema) bool {
return false
}
-func (m *schema) ShardNumber() uint32 {
- return m.common.ShardNumber()
-}
-
// registerTag registers the tag spec with given tagFamilyIdx and tagIdx.
func (m *schema) registerTag(tagFamilyIdx, tagIdx int, spec
*databasev1.TagSpec) {
m.common.RegisterTag(tagFamilyIdx, tagIdx, spec)
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index fedac76..4512b9d 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -20,7 +20,6 @@ package logical
import (
"github.com/pkg/errors"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
)
@@ -36,7 +35,6 @@ type Schema interface {
ProjTags(refs ...[]*TagRef) Schema
ProjFields(refs ...*FieldRef) Schema
Equal(Schema) bool
- ShardNumber() uint32
}
// TagSpec wraps offsets to access a tag in the raw data swiftly.
@@ -55,7 +53,6 @@ func (fs *TagSpec) Equal(other *TagSpec) bool {
// CommonSchema represents a sharable fields between independent schemas.
// It provides common access methods at the same time.
type CommonSchema struct {
- Group *commonv1.Group
IndexRules []*databasev1.IndexRule
TagMap map[string]*TagSpec
EntityList []string
@@ -92,11 +89,6 @@ func (cs *CommonSchema) RegisterTag(tagFamilyIdx, tagIdx
int, spec *databasev1.T
}
}
-// ShardNumber returns the number of shards defined by the schema.
-func (cs *CommonSchema) ShardNumber() uint32 {
- return cs.Group.ResourceOpts.ShardNum
-}
-
// IndexDefined checks whether the field given is indexed.
func (cs *CommonSchema) IndexDefined(tagName string) (bool,
*databasev1.IndexRule) {
for _, idxRule := range cs.IndexRules {
diff --git a/pkg/query/logical/stream/schema.go
b/pkg/query/logical/stream/schema.go
index 3961fb3..fc7bf59 100644
--- a/pkg/query/logical/stream/schema.go
+++ b/pkg/query/logical/stream/schema.go
@@ -86,10 +86,6 @@ func (s *schema) ProjFields(...*logical.FieldRef)
logical.Schema {
panic("stream does not support field")
}
-func (s *schema) ShardNumber() uint32 {
- return s.common.ShardNumber()
-}
-
func (s *schema) Scope() tsdb.Entry {
return tsdb.Entry(s.stream.Metadata.Name)
}
diff --git a/pkg/query/logical/stream/stream_analyzer.go
b/pkg/query/logical/stream/stream_analyzer.go
index 2087bf4..5a9ee5e 100644
--- a/pkg/query/logical/stream/stream_analyzer.go
+++ b/pkg/query/logical/stream/stream_analyzer.go
@@ -22,50 +22,25 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
-// Analyzer analyzes the stream querying expression to the execution plan.
-type Analyzer struct {
- metadataRepoImpl metadata.Repo
-}
-
-// CreateAnalyzerFromMetaService returns a Analyzer.
-func CreateAnalyzerFromMetaService(metaSvc metadata.Service) (*Analyzer,
error) {
- return &Analyzer{
- metaSvc,
- }, nil
-}
-
// BuildSchema returns Schema loaded from the metadata repository.
-func (a *Analyzer) BuildSchema(ctx context.Context, metadata
*commonv1.Metadata) (logical.Schema, error) {
- group, err := a.metadataRepoImpl.GroupRegistry().GetGroup(ctx,
metadata.GetGroup())
- if err != nil {
- return nil, err
- }
- stream, err := a.metadataRepoImpl.StreamRegistry().GetStream(ctx,
metadata)
- if err != nil {
- return nil, err
- }
-
- indexRules, err := a.metadataRepoImpl.IndexRules(ctx, metadata)
- if err != nil {
- return nil, err
- }
+func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) {
+ sm := streamSchema.GetSchema()
s := &schema{
common: &logical.CommonSchema{
- Group: group,
- IndexRules: indexRules,
+ IndexRules: streamSchema.GetIndexRules(),
TagMap: make(map[string]*logical.TagSpec),
- EntityList: stream.GetEntity().GetTagNames(),
+ EntityList: sm.GetEntity().GetTagNames(),
},
- stream: stream,
+ stream: sm,
}
// generate the streamSchema of the fields for the traceSeries
- for tagFamilyIdx, tagFamily := range stream.GetTagFamilies() {
+ for tagFamilyIdx, tagFamily := range sm.GetTagFamilies() {
for tagIdx, spec := range tagFamily.GetTags() {
s.registerTag(tagFamilyIdx, tagIdx, spec)
}
@@ -75,7 +50,7 @@ func (a *Analyzer) BuildSchema(ctx context.Context, metadata
*commonv1.Metadata)
}
// Analyze converts logical expressions to executable operation tree
represented by Plan.
-func (a *Analyzer) Analyze(_ context.Context, criteria *streamv1.QueryRequest,
metadata *commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
+func Analyze(_ context.Context, criteria *streamv1.QueryRequest, metadata
*commonv1.Metadata, s logical.Schema) (logical.Plan, error) {
// parse fields
plan := parseTags(criteria, metadata)