This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 5cee6c26 Distributed query (#326) 5cee6c26 is described below commit 5cee6c2600153cf4d71fde15a169f8467bc5e002 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Sep 11 16:45:02 2023 +0800 Distributed query (#326) --- CHANGES.md | 1 + api/proto/banyandb/cluster/v1/rpc.proto | 18 +- api/proto/banyandb/database/v1/database.proto | 3 +- banyand/dquery/dquery.go | 87 +++ banyand/dquery/measure.go | 100 ++++ banyand/dquery/stream.go | 84 +++ banyand/dquery/topn.go | 122 ++++ banyand/internal/cmd/liaison.go | 6 + banyand/measure/measure.go | 38 +- banyand/measure/measure_write.go | 2 +- banyand/measure/metadata.go | 63 ++- banyand/measure/metadata_test.go | 10 +- banyand/measure/service.go | 6 +- banyand/query/processor.go | 15 +- banyand/queue/local.go | 8 + banyand/queue/pub/pub.go | 147 ++++- banyand/queue/queue.go | 1 + banyand/queue/sub/server.go | 2 +- banyand/queue/sub/sub.go | 56 +- banyand/stream/metadata.go | 65 ++- banyand/stream/metadata_test.go | 10 +- banyand/stream/service.go | 12 +- banyand/stream/stream.go | 34 +- docs/api-reference.md | 627 +++++++++++---------- pkg/bus/bus.go | 5 + pkg/iter/sort/sort.go | 133 +++++ pkg/iter/sort/sort_test.go | 102 ++++ pkg/query/executor/interface.go | 58 +- pkg/query/logical/common.go | 32 +- pkg/query/logical/index_filter.go | 14 +- pkg/query/logical/iter.go | 134 ----- pkg/query/logical/measure/measure_analyzer.go | 67 ++- pkg/query/logical/measure/measure_plan.go | 3 +- .../logical/measure/measure_plan_aggregation.go | 3 +- .../logical/measure/measure_plan_distributed.go | 234 ++++++++ pkg/query/logical/measure/measure_plan_groupby.go | 7 +- .../measure/measure_plan_indexscan_local.go | 18 +- pkg/query/logical/measure/measure_plan_top.go | 3 +- pkg/query/logical/stream/stream_analyzer.go | 28 +- .../logical/stream/stream_plan_distributed.go | 214 +++++++ .../logical/stream/stream_plan_indexscan_global.go | 11 +- .../logical/stream/stream_plan_indexscan_local.go | 11 +- pkg/query/logical/stream/stream_plan_tag_filter.go | 5 +- pkg/run/channel_closer.go | 2 +- pkg/schema/metadata.go | 364 ++++++------ pkg/schema/schema.go | 96 ++++ 46 files changed, 2237 insertions(+), 824 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 54f68ee0..0d98483c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ Release Notes. - Register the node role to the metadata registry. - Implement the remote queue to spreading data to data nodes. - Fix parse environment variables error +- Implement the distributed query engine. ### Bugs diff --git a/api/proto/banyandb/cluster/v1/rpc.proto b/api/proto/banyandb/cluster/v1/rpc.proto index 076c21e6..732bbe1f 100644 --- a/api/proto/banyandb/cluster/v1/rpc.proto +++ b/api/proto/banyandb/cluster/v1/rpc.proto @@ -19,22 +19,22 @@ syntax = "proto3"; package banyandb.cluster.v1; -import "banyandb/measure/v1/write.proto"; -import "banyandb/stream/v1/write.proto"; +import "google/protobuf/any.proto"; option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"; -message WriteRequest { +message SendRequest { string topic = 1; uint64 message_id = 2; - oneof request { - stream.v1.InternalWriteRequest stream = 3; - measure.v1.InternalWriteRequest measure = 4; - } + google.protobuf.Any body = 3; } -message WriteResponse {} +message SendResponse { + uint64 message_id = 1; + string error = 2; + google.protobuf.Any body = 3; +} service Service { - rpc Write(stream WriteRequest) returns (stream WriteResponse); + rpc Send(stream SendRequest) returns (stream SendResponse); } diff --git a/api/proto/banyandb/database/v1/database.proto b/api/proto/banyandb/database/v1/database.proto index 23309d0a..8ae7ffc3 100644 --- a/api/proto/banyandb/database/v1/database.proto +++ b/api/proto/banyandb/database/v1/database.proto @@ -29,8 +29,7 @@ enum Role { ROLE_UNSPECIFIED = 0; ROLE_META = 1; ROLE_DATA = 2; - ROLE_QUERY = 3; - ROLE_LIAISON = 4; + ROLE_LIAISON = 3; } message Node { diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go new file mode 100644 index 00000000..61f490c8 --- /dev/null +++ b/banyand/dquery/dquery.go @@ -0,0 +1,87 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package dquery implement the distributed query. +package dquery + +import ( + "context" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +const ( + moduleName = "distributed-query" +) + +type queryService struct { + log *logger.Logger + metaService metadata.Repo + sqp *streamQueryProcessor + mqp *measureQueryProcessor + tqp *topNQueryProcessor +} + +// NewService return a new query service. +func NewService(metaService metadata.Repo, broadcaster bus.Broadcaster, +) (run.Unit, error) { + svc := &queryService{ + metaService: metaService, + } + svc.sqp = &streamQueryProcessor{ + queryService: svc, + broadcaster: broadcaster, + } + svc.mqp = &measureQueryProcessor{ + queryService: svc, + broadcaster: broadcaster, + } + svc.tqp = &topNQueryProcessor{ + queryService: svc, + broadcaster: broadcaster, + } + return svc, nil +} + +func (q *queryService) Name() string { + return moduleName +} + +func (q *queryService) PreRun(_ context.Context) error { + q.log = logger.GetLogger(moduleName) + q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log) + q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log) + return nil +} + +var _ executor.DistributedExecutionContext = (*distributedContext)(nil) + +type distributedContext struct { + bus.Broadcaster + timeRange *modelv1.TimeRange +} + +func (dc *distributedContext) TimeRange() *modelv1.TimeRange { + return dc.timeRange +} diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go new file mode 100644 index 00000000..522a9cda --- /dev/null +++ b/banyand/dquery/measure.go @@ -0,0 +1,100 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dquery + +import ( + "context" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" +) + +type measureQueryProcessor struct { + measureService measure.Query + broadcaster bus.Broadcaster + *queryService +} + +func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { + queryCriteria, ok := message.Data().(*measurev1.QueryRequest) + now := time.Now().UnixNano() + if !ok { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type")) + return + } + ml := p.log.Named("measure", queryCriteria.Metadata.Group, queryCriteria.Metadata.Name) + if e := ml.Debug(); e.Enabled() { + e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a query event") + } + + meta := queryCriteria.GetMetadata() + ec, err := p.measureService.Measure(meta) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to get execution context for measure %s: %v", meta.GetName(), err)) + return + } + + s, err := logical_measure.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for measure %s: %v", meta.GetName(), err)) + return + } + + plan, err := logical_measure.DistributedAnalyze(queryCriteria, s) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for measure %s: %v", meta.GetName(), err)) + return + } + + if e := ml.Debug(); e.Enabled() { + e.Str("plan", plan.String()).Msg("query plan") + } + + mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(), &distributedContext{ + Broadcaster: p.broadcaster, + timeRange: queryCriteria.TimeRange, + })) + if err != nil { + ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan") + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err)) + return + } + defer func() { + if err = mIterator.Close(); err != nil { + ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan") + } + }() + result := make([]*measurev1.DataPoint, 0) + for mIterator.Next() { + current := mIterator.Current() + if len(current) > 0 { + result = append(result, current[0]) + } + } + if e := ml.Debug(); e.Enabled() { + e.RawJSON("ret", logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure") + } + resp = bus.NewMessage(bus.MessageID(now), result) + return +} diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go new file mode 100644 index 00000000..3a92aaeb --- /dev/null +++ b/banyand/dquery/stream.go @@ -0,0 +1,84 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dquery + +import ( + "context" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + logical_stream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" +) + +type streamQueryProcessor struct { + streamService stream.Query + broadcaster bus.Broadcaster + *queryService +} + +func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { + now := time.Now().UnixNano() + queryCriteria, ok := message.Data().(*streamv1.QueryRequest) + if !ok { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type")) + return + } + if p.log.Debug().Enabled() { + p.log.Debug().RawJSON("criteria", logger.Proto(queryCriteria)).Msg("received a query request") + } + + meta := queryCriteria.GetMetadata() + ec, err := p.streamService.Stream(meta) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to get execution context for stream %s: %v", meta.GetName(), err)) + return + } + s, err := logical_stream.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for stream %s: %v", meta.GetName(), err)) + return + } + + plan, err := logical_stream.DistributedAnalyze(queryCriteria, s) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for stream %s: %v", meta.GetName(), err)) + return + } + + if p.log.Debug().Enabled() { + p.log.Debug().Str("plan", plan.String()).Msg("query plan") + } + entities, err := plan.(executor.StreamExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(), &distributedContext{ + Broadcaster: p.broadcaster, + timeRange: queryCriteria.TimeRange, + })) + if err != nil { + p.log.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to execute the query plan") + resp = bus.NewMessage(bus.MessageID(now), common.NewError("execute the query plan for stream %s: %v", meta.GetName(), err)) + return + } + + resp = bus.NewMessage(bus.MessageID(now), entities) + + return +} diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go new file mode 100644 index 00000000..df34d337 --- /dev/null +++ b/banyand/dquery/topn.go @@ -0,0 +1,122 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dquery + +import ( + "go.uber.org/multierr" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/api/data" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/iter/sort" +) + +type topNQueryProcessor struct { + broadcaster bus.Broadcaster + *queryService +} + +func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { + request, ok := message.Data().(*measurev1.TopNRequest) + if !ok { + t.log.Warn().Msg("invalid event data type") + return + } + if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED { + t.log.Warn().Msg("invalid requested sort direction") + return + } + if e := t.log.Debug(); e.Enabled() { + e.Stringer("req", request).Msg("received a topN query event") + } + now := bus.MessageID(request.TimeRange.Begin.Nanos) + ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, bus.NewMessage(now, request)) + if err != nil { + resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), err)) + return + } + var allErr error + var sii []sort.Iterator[*comparableTopNItem] + var latestTimestamp *timestamppb.Timestamp + for _, f := range ff { + if m, getErr := f.Get(); getErr != nil { + allErr = multierr.Append(allErr, getErr) + } else { + tl := m.Data().(*measurev1.TopNList) + un := tl.Timestamp.AsTime().UnixNano() + if un > latestTimestamp.AsTime().UnixNano() { + latestTimestamp = tl.Timestamp + } + sii = append(sii, &sortedTopNList{TopNList: tl}) + } + } + var desc bool + if request.GetFieldValueSort() == modelv1.Sort_SORT_DESC { + desc = true + } + iter := sort.NewItemIter[*comparableTopNItem](sii, desc) + defer func() { + _ = iter.Close() + }() + var items []*measurev1.TopNList_Item + for iter.Next() && len(items) < int(request.TopN) { + items = append(items, iter.Val().TopNList_Item) + } + resp = bus.NewMessage(now, &measurev1.TopNList{ + Items: items, + Timestamp: latestTimestamp, + }) + return +} + +var _ sort.Comparable = (*comparableTopNItem)(nil) + +type comparableTopNItem struct { + *measurev1.TopNList_Item +} + +func (c *comparableTopNItem) SortedField() []byte { + return convert.Int64ToBytes(c.Value.GetInt().Value) +} + +var _ sort.Iterator[*comparableTopNItem] = (*sortedTopNList)(nil) + +type sortedTopNList struct { + *measurev1.TopNList + index int +} + +func (*sortedTopNList) Close() error { + return nil +} + +func (s *sortedTopNList) Next() bool { + if s.index >= len(s.Items) { + return false + } + s.index++ + return s.index < len(s.Items) +} + +func (s *sortedTopNList) Val() *comparableTopNItem { + return &comparableTopNItem{s.Items[s.index-1]} +} diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go index 0b842201..d09428f0 100644 --- a/banyand/internal/cmd/liaison.go +++ b/banyand/internal/cmd/liaison.go @@ -24,6 +24,7 @@ import ( "github.com/spf13/cobra" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/dquery" "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/metadata" @@ -49,10 +50,15 @@ func newLiaisonCmd() *cobra.Command { profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer() + dQuery, err := dquery.NewService(metaSvc, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate distributed query service") + } units := []run.Unit{ new(signal.Handler), pipeline, + dQuery, grpcServer, httpServer, profSvc, diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index 9fbfe653..b8a55140 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -22,7 +22,6 @@ package measure import ( "context" - "math" "time" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -32,7 +31,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb/index" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/logical" - resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -43,18 +41,17 @@ const ( ) type measure struct { - databaseSupplier tsdb.Supplier - l *logger.Logger - schema *databasev1.Measure - indexWriter *index.Writer - processorManager *topNProcessorManager - name string - group string - indexRules []*databasev1.IndexRule - topNAggregations []*databasev1.TopNAggregation - maxObservedModRevision int64 - interval time.Duration - shardNum uint32 + databaseSupplier tsdb.Supplier + l *logger.Logger + schema *databasev1.Measure + indexWriter *index.Writer + processorManager *topNProcessorManager + name string + group string + indexRules []*databasev1.IndexRule + topNAggregations []*databasev1.TopNAggregation + interval time.Duration + shardNum uint32 } func (s *measure) startSteamingManager(pipeline queue.Queue) error { @@ -80,22 +77,10 @@ func (s *measure) GetSchema() *databasev1.Measure { return s.schema } -func (s *measure) GetMetadata() *commonv1.Metadata { - return s.schema.Metadata -} - func (s *measure) GetIndexRules() []*databasev1.IndexRule { return s.indexRules } -func (s *measure) GetTopN() []*databasev1.TopNAggregation { - return s.topNAggregations -} - -func (s *measure) MaxObservedModRevision() int64 { - return s.maxObservedModRevision -} - func (s *measure) Close() error { if s.processorManager == nil { return nil @@ -105,7 +90,6 @@ func (s *measure) Close() error { func (s *measure) parseSpec() (err error) { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() - s.maxObservedModRevision = int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), float64(resourceSchema.ParseMaxModRevision(s.topNAggregations)))) if s.schema.Interval != "" { s.interval, err = timestamp.ParseDuration(s.schema.Interval) } diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index 2b5f561a..3235b5c6 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -134,7 +134,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb if s.processorManager != nil { s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ - Metadata: s.GetMetadata(), + Metadata: s.GetSchema().Metadata, DataPoint: value, }, EntityValues: entityValues[1:], diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 9df34dfa..1185442b 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -20,6 +20,7 @@ package measure import ( "context" "fmt" + "io" "path" "time" @@ -50,7 +51,7 @@ type schemaRepo struct { func newSchemaRepo(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue, encoderBufferSize, bufferSize int64, ) schemaRepo { - return schemaRepo{ + sr := schemaRepo{ l: l, metadata: metadata, Repository: resourceSchema.NewRepository( @@ -59,6 +60,38 @@ func newSchemaRepo(path string, metadata metadata.Repo, newSupplier(path, metadata, dbOpts, l, pipeline, encoderBufferSize, bufferSize), ), } + sr.start() + return sr +} + +// NewPortableRepository creates a new portable repository. +func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query { + r := &schemaRepo{ + l: l, + metadata: metadata, + Repository: resourceSchema.NewPortableRepository( + metadata, + l, + newPortableSupplier(metadata, l), + ), + } + r.start() + return r +} + +func (sr *schemaRepo) start() { + sr.Watcher() + sr.metadata. + RegisterHandler("measure", schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation, + sr) +} + +func (sr *schemaRepo) Measure(metadata *commonv1.Metadata) (Measure, error) { + sm, ok := sr.loadMeasure(metadata) + if !ok { + return nil, errors.WithStack(ErrMeasureNotExist) + } + return sm, nil } func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { @@ -263,7 +296,7 @@ func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) if !ok { return nil, false } - s, ok := r.(*measure) + s, ok := r.Delegated().(*measure) return s, ok } @@ -292,12 +325,12 @@ func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, } } -func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resourceSchema.ResourceSpec) (resourceSchema.Resource, error) { - measureSchema := spec.Schema.(*databasev1.Measure) +func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resourceSchema.Resource) (io.Closer, error) { + measureSchema := spec.Schema().(*databasev1.Measure) return openMeasure(shardNum, db, measureSpec{ schema: measureSchema, - indexRules: spec.IndexRules, - topNAggregations: spec.Aggregations, + indexRules: spec.IndexRules(), + topNAggregations: spec.TopN(), }, s.l, s.pipeline) } @@ -349,3 +382,21 @@ func intervalFn(key []byte) time.Duration { } return interval } + +type portableSupplier struct { + metadata metadata.Repo + l *logger.Logger +} + +func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) *portableSupplier { + return &portableSupplier{ + metadata: metadata, + l: l, + } +} + +func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return s.metadata.MeasureRegistry().GetMeasure(ctx, md) +} diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go index 9504c1f9..f6cda0f1 100644 --- a/banyand/measure/metadata_test.go +++ b/banyand/measure/metadata_test.go @@ -37,6 +37,10 @@ var _ = Describe("Metadata", func() { BeforeEach(func() { svcs, deferFn = setUp() goods = gleak.Goroutines() + Eventually(func() bool { + _, ok := svcs.measure.LoadGroup("sw_metric") + return ok + }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) }) AfterEach(func() { @@ -45,12 +49,6 @@ var _ = Describe("Metadata", func() { }) Context("Manage group", func() { - It("should pass smoke test", func() { - Eventually(func() bool { - _, ok := svcs.measure.LoadGroup("sw_metric") - return ok - }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) - }) It("should close the group", func() { deleted, err := svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "sw_metric") Expect(err).ShouldNot(HaveOccurred()) diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 47c93684..43940d8f 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -27,7 +27,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/tsdb" @@ -114,10 +113,7 @@ func (s *service) PreRun(_ context.Context) error { s.schemaRepo = newSchemaRepo(path, s.metadata, s.dbOpts, s.l, s.localPipeline, int64(s.BlockEncoderBufferSize), int64(s.BlockBufferSize)) // run a serial watcher - go s.schemaRepo.Watcher() - s.metadata. - RegisterHandler("measure", schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation, - &s.schemaRepo) + s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo) err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener) if err != nil { diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 67df4b56..a47e6df8 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -25,7 +25,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/measure" @@ -83,8 +82,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to get execution context for stream %s: %v", meta.GetName(), err)) return } - - s, err := logical_stream.BuildSchema(ec) + s, err := logical_stream.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for stream %s: %v", meta.GetName(), err)) return @@ -99,8 +97,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) { if p.log.Debug().Enabled() { p.log.Debug().Str("plan", plan.String()).Msg("query plan") } - - entities, err := plan.(executor.StreamExecutable).Execute(ec) + entities, err := plan.(executor.StreamExecutable).Execute(executor.WithStreamExecutionContext(context.Background(), ec)) if err != nil { p.log.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to execute the query plan") resp = bus.NewMessage(bus.MessageID(now), common.NewError("execute the query plan for stream %s: %v", meta.GetName(), err)) @@ -136,7 +133,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { return } - s, err := logical_measure.BuildSchema(ec) + s, err := logical_measure.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for measure %s: %v", meta.GetName(), err)) return @@ -152,7 +149,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) { e.Str("plan", plan.String()).Msg("query plan") } - mIterator, err := plan.(executor.MeasureExecutable).Execute(ec) + mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(), ec)) if err != nil { ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan") resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err)) @@ -181,10 +178,6 @@ func (q *queryService) Name() string { return moduleName } -func (q *queryService) Role() databasev1.Role { - return databasev1.Role_ROLE_QUERY -} - func (q *queryService) PreRun(_ context.Context) error { q.log = logger.GetLogger(moduleName) return multierr.Combine( diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 659e908e..09ca07aa 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -62,6 +62,14 @@ func (l *local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, er return l.local.Publish(topic, message...) } +func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, error) { + f, err := l.Publish(topic, message) + if err != nil { + return nil, err + } + return []bus.Future{f}, nil +} + func (l local) Name() string { return "local-pipeline" } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 1f3e3cf4..b72e9a07 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -20,15 +20,17 @@ package pub import ( "context" + "errors" "fmt" + "io" "sync" "time" "go.uber.org/multierr" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" - measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" - streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -66,8 +68,56 @@ func (p *pub) Serve() run.StopNotify { return p.closer.CloseNotify() } -func (*pub) Publish(_ bus.Topic, _ ...bus.Message) (bus.Future, error) { - panic("unimplemented") +func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, error) { + var names []string + p.mu.RLock() + for k := range p.clients { + names = append(names, k) + } + p.mu.RUnlock() + var futures []bus.Future + for _, n := range names { + f, err := p.Publish(topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) + if err != nil { + return nil, err + } + futures = append(futures, f) + } + return futures, nil +} + +func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { + var err error + f := &future{} + handleMessage := func(m bus.Message, err error) error { + r, errSend := messageToRequest(topic, m) + if errSend != nil { + return multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errSend)) + } + node := m.Node() + p.mu.Lock() + client, ok := p.clients[node] + p.mu.Unlock() + if !ok { + return multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + stream, errCreateStream := client.client.Send(ctx) + if err != nil { + return multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) + } + errSend = stream.Send(r) + if errSend != nil { + return multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend)) + } + f.clients = append(f.clients, stream) + return err + } + for _, m := range messages { + err = handleMessage(m, err) + } + return f, err } // NewBatchPublisher returns a new batch publisher. @@ -96,7 +146,7 @@ func (p *pub) PreRun(context.Context) error { } type writeStream struct { - client clusterv1.Service_WriteClient + client clusterv1.Service_SendClient cancel func() } @@ -113,27 +163,14 @@ func (bp *batchPublisher) Close() (err error) { return err } -func (bp *batchPublisher) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) { - var err error - for _, m := range message { - if !m.IsRemote() { - continue - } - r := &clusterv1.WriteRequest{ - Topic: topic.String(), - MessageId: uint64(m.ID()), - } - switch d := m.Data().(type) { - case *streamv1.InternalWriteRequest: - r.Request = &clusterv1.WriteRequest_Stream{Stream: d} - case *measurev1.InternalWriteRequest: - r.Request = &clusterv1.WriteRequest_Measure{Measure: d} - default: - err = multierr.Append(err, fmt.Errorf("invalid message type %T", d)) +func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { + for _, m := range messages { + r, err := messageToRequest(topic, m) + if err != nil { + err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, err)) continue } node := m.Node() - sendData := func() (success bool) { if stream, ok := bp.streams[node]; !ok { defer func() { @@ -149,7 +186,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, message ...bus.Message) (bus. } errSend := stream.client.Send(r) if errSend != nil { - err = multierr.Append(errSend, fmt.Errorf("failed to send message to node %s: %w", node, err)) + err = multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend)) return false } _, errRecv := stream.client.Recv() @@ -170,7 +207,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, message ...bus.Message) (bus. } //nolint: govet ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - stream, errCreateStream := client.client.Write(ctx) + stream, errCreateStream := client.client.Send(ctx) if err != nil { err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) continue @@ -182,5 +219,63 @@ func (bp *batchPublisher) Publish(topic bus.Topic, message ...bus.Message) (bus. _ = sendData() } //nolint: govet - return nil, err + return nil, nil +} + +func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, error) { + if !m.IsRemote() { + return nil, fmt.Errorf("message %d is not remote", m.ID()) + } + r := &clusterv1.SendRequest{ + Topic: topic.String(), + MessageId: uint64(m.ID()), + } + message, ok := m.Data().(proto.Message) + if !ok { + return nil, fmt.Errorf("invalid message type %T", m) + } + anyMessage, err := anypb.New(message) + if err != nil { + return nil, fmt.Errorf("failed to marshal message %T: %w", m, err) + } + r.Body = anyMessage + return r, nil +} + +type future struct { + clients []clusterv1.Service_SendClient +} + +func (l *future) Get() (bus.Message, error) { + if len(l.clients) < 1 { + return bus.Message{}, io.EOF + } + c := l.clients[0] + defer func() { + l.clients = l.clients[1:] + }() + resp, err := c.Recv() + if err != nil { + return bus.Message{}, err + } + return bus.NewMessage( + bus.MessageID(resp.MessageId), + resp.Body, + ), nil +} + +func (l *future) GetAll() ([]bus.Message, error) { + var globalErr error + ret := make([]bus.Message, 0, len(l.clients)) + for { + m, err := l.Get() + if errors.Is(err, io.EOF) { + return ret, globalErr + } + if err != nil { + globalErr = multierr.Append(globalErr, err) + continue + } + ret = append(ret, m) + } } diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index d899162e..bada6711 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -37,6 +37,7 @@ type Queue interface { type Client interface { run.Unit bus.Publisher + bus.Broadcaster NewBatchPublisher() BatchPublisher } diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index d7e415da..78833713 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -59,7 +59,7 @@ type server struct { creds credentials.TransportCredentials log *logger.Logger ser *grpclib.Server - listeners map[bus.Topic][]bus.MessageListener + listeners map[bus.Topic]bus.MessageListener *clusterv1.UnimplementedServiceServer addr string certFile string diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go index d53d4fe1..c689eef0 100644 --- a/banyand/queue/sub/sub.go +++ b/banyand/queue/sub/sub.go @@ -22,15 +22,20 @@ import ( "io" "github.com/pkg/errors" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" "github.com/apache/skywalking-banyandb/pkg/bus" ) -// Write implements v1.ServiceServer. -func (s *server) Write(stream clusterv1.Service_WriteServer) error { - reply := func(stream clusterv1.Service_WriteServer) { - if errResp := stream.Send(&clusterv1.WriteResponse{}); errResp != nil { +func (s *server) Send(stream clusterv1.Service_SendServer) error { + reply := func(writeEntity *clusterv1.SendRequest, err error, message string) { + s.log.Error().Stringer("written", writeEntity).Err(err).Msg(message) + if errResp := stream.Send(&clusterv1.SendResponse{ + MessageId: writeEntity.MessageId, + Error: message, + }); errResp != nil { s.log.Err(errResp).Msg("failed to send response") } } @@ -47,8 +52,7 @@ func (s *server) Write(stream clusterv1.Service_WriteServer) error { return nil } if err != nil { - s.log.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message") - reply(stream) + reply(writeEntity, err, "failed to receive message") continue } if writeEntity.Topic != "" && topic == nil { @@ -56,14 +60,36 @@ func (s *server) Write(stream clusterv1.Service_WriteServer) error { topic = &t } if topic == nil { - s.log.Error().Stringer("written", writeEntity).Msg("topic is empty") - reply(stream) + reply(writeEntity, err, "topic is empty") continue } - for _, listener := range s.getListeners(*topic) { - _ = listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), writeEntity.Request)) + listener := s.getListeners(*topic) + if listener == nil { + reply(writeEntity, err, "no listener found") + continue + } + m := listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), writeEntity.Body)) + if m.Data() == nil { + reply(writeEntity, err, "no response") + continue + } + message, ok := m.Data().(proto.Message) + if !ok { + reply(writeEntity, err, "invalid response") + continue + } + anyMessage, err := anypb.New(message) + if err != nil { + reply(writeEntity, err, "failed to marshal message") + continue + } + if err := stream.Send(&clusterv1.SendResponse{ + MessageId: writeEntity.MessageId, + Body: anyMessage, + }); err != nil { + reply(writeEntity, err, "failed to send response") + continue } - reply(stream) } } @@ -71,13 +97,13 @@ func (s *server) Subscribe(topic bus.Topic, listener bus.MessageListener) error s.listenersLock.Lock() defer s.listenersLock.Unlock() if _, ok := s.listeners[topic]; !ok { - s.listeners[topic] = make([]bus.MessageListener, 0) + s.listeners[topic] = listener + return nil } - s.listeners[topic] = append(s.listeners[topic], listener) - return nil + return errors.New("topic already exists") } -func (s *server) getListeners(topic bus.Topic) []bus.MessageListener { +func (s *server) getListeners(topic bus.Topic) bus.MessageListener { s.listenersLock.RLock() defer s.listenersLock.RUnlock() return s.listeners[topic] diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index e349872e..7a9b34df 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -19,9 +19,12 @@ package stream import ( "context" + "io" "path" "time" + "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -33,6 +36,8 @@ import ( resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) +var _ Query = (*schemaRepo)(nil) + type schemaRepo struct { resourceSchema.Repository l *logger.Logger @@ -42,7 +47,7 @@ type schemaRepo struct { func newSchemaRepo(path string, metadata metadata.Repo, bufferSize int64, dbOpts tsdb.DatabaseOpts, l *logger.Logger, ) schemaRepo { - return schemaRepo{ + r := schemaRepo{ l: l, metadata: metadata, Repository: resourceSchema.NewRepository( @@ -51,6 +56,38 @@ func newSchemaRepo(path string, metadata metadata.Repo, newSupplier(path, metadata, bufferSize, dbOpts, l), ), } + r.start() + return r +} + +// NewPortableRepository creates a new portable repository. +func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query { + r := &schemaRepo{ + l: l, + metadata: metadata, + Repository: resourceSchema.NewPortableRepository( + metadata, + l, + newPortableSupplier(metadata, l), + ), + } + r.start() + return r +} + +func (sr *schemaRepo) start() { + sr.Watcher() + sr.metadata.RegisterHandler("stream", + schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, + sr) +} + +func (sr *schemaRepo) Stream(metadata *commonv1.Metadata) (Stream, error) { + sm, ok := sr.loadStream(metadata) + if !ok { + return nil, errors.WithStack(errStreamNotExist) + } + return sm, nil } func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) { @@ -156,7 +193,7 @@ func (sr *schemaRepo) loadStream(metadata *commonv1.Metadata) (*stream, bool) { if !ok { return nil, false } - s, ok := r.(*stream) + s, ok := r.Delegated().(*stream) return s, ok } @@ -180,11 +217,11 @@ func newSupplier(path string, metadata metadata.Repo, bufferSize int64, dbOpts t } } -func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resourceSchema.ResourceSpec) (resourceSchema.Resource, error) { - streamSchema := spec.Schema.(*databasev1.Stream) +func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, resource resourceSchema.Resource) (io.Closer, error) { + streamSchema := resource.Schema().(*databasev1.Stream) return openStream(shardNum, db, streamSpec{ schema: streamSchema, - indexRules: spec.IndexRules, + indexRules: resource.IndexRules(), }, s.l), nil } @@ -222,3 +259,21 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { }), opts) } + +type portableSupplier struct { + metadata metadata.Repo + l *logger.Logger +} + +func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) *portableSupplier { + return &portableSupplier{ + metadata: metadata, + l: l, + } +} + +func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return s.metadata.StreamRegistry().GetStream(ctx, md) +} diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go index dc184d40..8c4f3bfb 100644 --- a/banyand/stream/metadata_test.go +++ b/banyand/stream/metadata_test.go @@ -37,6 +37,10 @@ var _ = Describe("Metadata", func() { BeforeEach(func() { goods = gleak.Goroutines() svcs, deferFn = setUp() + Eventually(func() bool { + _, ok := svcs.stream.schemaRepo.LoadGroup("default") + return ok + }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) }) AfterEach(func() { @@ -45,12 +49,6 @@ var _ = Describe("Metadata", func() { }) Context("Manage group", func() { - It("should pass smoke test", func() { - Eventually(func() bool { - _, ok := svcs.stream.schemaRepo.LoadGroup("default") - return ok - }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) - }) It("should close the group", func() { deleted, err := svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default") Expect(err).ShouldNot(HaveOccurred()) diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 3af678d0..4d888cfb 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -27,7 +27,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/tsdb" @@ -62,11 +61,7 @@ type service struct { } func (s *service) Stream(metadata *commonv1.Metadata) (Stream, error) { - sm, ok := s.schemaRepo.loadStream(metadata) - if !ok { - return nil, errors.WithStack(errStreamNotExist) - } - return sm, nil + return s.schemaRepo.Stream(metadata) } func (s *service) FlagSet() *run.FlagSet { @@ -102,11 +97,6 @@ func (s *service) PreRun(_ context.Context) error { path := path.Join(s.root, s.Name()) observability.UpdatePath(path) s.schemaRepo = newSchemaRepo(path, s.metadata, int64(s.blockBufferSize), s.dbOpts, s.l) - // run a serial watcher - s.schemaRepo.Watcher() - s.metadata.RegisterHandler("stream", schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, - &s.schemaRepo) - s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo) errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener) diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index fb60bd64..a989ebb2 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -22,33 +22,24 @@ package stream import ( "context" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/banyand/tsdb/index" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/schema" ) // a chunk is 1MB. const chunkSize = 1 << 20 -var _ schema.Resource = (*stream)(nil) - type stream struct { - db tsdb.Supplier - l *logger.Logger - schema *databasev1.Stream - indexWriter *index.Writer - name string - group string - indexRules []*databasev1.IndexRule - maxObservedModRevision int64 - shardNum uint32 -} - -func (s *stream) GetMetadata() *commonv1.Metadata { - return s.schema.Metadata + db tsdb.Supplier + l *logger.Logger + schema *databasev1.Stream + indexWriter *index.Writer + name string + group string + indexRules []*databasev1.IndexRule + shardNum uint32 } func (s *stream) GetSchema() *databasev1.Stream { @@ -59,21 +50,12 @@ func (s *stream) GetIndexRules() []*databasev1.IndexRule { return s.indexRules } -func (s *stream) MaxObservedModRevision() int64 { - return s.maxObservedModRevision -} - -func (s *stream) GetTopN() []*databasev1.TopNAggregation { - return nil -} - func (s *stream) Close() error { return nil } func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() - s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules) } type streamSpec struct { diff --git a/docs/api-reference.md b/docs/api-reference.md index 537ac018..96379185 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -3,6 +3,12 @@ ## Table of Contents +- [banyandb/cluster/v1/rpc.proto](#banyandb_cluster_v1_rpc-proto) + - [SendRequest](#banyandb-cluster-v1-SendRequest) + - [SendResponse](#banyandb-cluster-v1-SendResponse) + + - [Service](#banyandb-cluster-v1-Service) + - [banyandb/common/v1/common.proto](#banyandb_common_v1_common-proto) - [Group](#banyandb-common-v1-Group) - [IntervalRule](#banyandb-common-v1-IntervalRule) @@ -12,6 +18,12 @@ - [Catalog](#banyandb-common-v1-Catalog) - [IntervalRule.Unit](#banyandb-common-v1-IntervalRule-Unit) +- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto) + - [Node](#banyandb-database-v1-Node) + - [Shard](#banyandb-database-v1-Shard) + + - [Role](#banyandb-database-v1-Role) + - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto) - [FieldValue](#banyandb-model-v1-FieldValue) - [Float](#banyandb-model-v1-Float) @@ -24,30 +36,6 @@ - [AggregationFunction](#banyandb-model-v1-AggregationFunction) -- [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto) - - [DataPointValue](#banyandb-measure-v1-DataPointValue) - - [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest) - - [WriteRequest](#banyandb-measure-v1-WriteRequest) - - [WriteResponse](#banyandb-measure-v1-WriteResponse) - -- [banyandb/stream/v1/write.proto](#banyandb_stream_v1_write-proto) - - [ElementValue](#banyandb-stream-v1-ElementValue) - - [InternalWriteRequest](#banyandb-stream-v1-InternalWriteRequest) - - [WriteRequest](#banyandb-stream-v1-WriteRequest) - - [WriteResponse](#banyandb-stream-v1-WriteResponse) - -- [banyandb/cluster/v1/rpc.proto](#banyandb_cluster_v1_rpc-proto) - - [WriteRequest](#banyandb-cluster-v1-WriteRequest) - - [WriteResponse](#banyandb-cluster-v1-WriteResponse) - - - [Service](#banyandb-cluster-v1-Service) - -- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto) - - [Node](#banyandb-database-v1-Node) - - [Shard](#banyandb-database-v1-Shard) - - - [Role](#banyandb-database-v1-Role) - - [banyandb/model/v1/query.proto](#banyandb_model_v1_query-proto) - [Condition](#banyandb-model-v1-Condition) - [Criteria](#banyandb-model-v1-Criteria) @@ -180,6 +168,12 @@ - [TopNRequest](#banyandb-measure-v1-TopNRequest) - [TopNResponse](#banyandb-measure-v1-TopNResponse) +- [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto) + - [DataPointValue](#banyandb-measure-v1-DataPointValue) + - [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest) + - [WriteRequest](#banyandb-measure-v1-WriteRequest) + - [WriteResponse](#banyandb-measure-v1-WriteResponse) + - [banyandb/measure/v1/rpc.proto](#banyandb_measure_v1_rpc-proto) - [MeasureService](#banyandb-measure-v1-MeasureService) @@ -206,6 +200,12 @@ - [QueryRequest](#banyandb-stream-v1-QueryRequest) - [QueryResponse](#banyandb-stream-v1-QueryResponse) +- [banyandb/stream/v1/write.proto](#banyandb_stream_v1_write-proto) + - [ElementValue](#banyandb-stream-v1-ElementValue) + - [InternalWriteRequest](#banyandb-stream-v1-InternalWriteRequest) + - [WriteRequest](#banyandb-stream-v1-WriteRequest) + - [WriteResponse](#banyandb-stream-v1-WriteResponse) + - [banyandb/stream/v1/rpc.proto](#banyandb_stream_v1_rpc-proto) - [StreamService](#banyandb-stream-v1-StreamService) @@ -213,6 +213,66 @@ +<a name="banyandb_cluster_v1_rpc-proto"></a> +<p align="right"><a href="#top">Top</a></p> + +## banyandb/cluster/v1/rpc.proto + + + +<a name="banyandb-cluster-v1-SendRequest"></a> + +### SendRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| topic | [string](#string) | | | +| message_id | [uint64](#uint64) | | | +| body | [google.protobuf.Any](#google-protobuf-Any) | | | + + + + + + +<a name="banyandb-cluster-v1-SendResponse"></a> + +### SendResponse + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| message_id | [uint64](#uint64) | | | +| error | [string](#string) | | | +| body | [google.protobuf.Any](#google-protobuf-Any) | | | + + + + + + + + + + + + +<a name="banyandb-cluster-v1-Service"></a> + +### Service + + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| Send | [SendRequest](#banyandb-cluster-v1-SendRequest) stream | [SendResponse](#banyandb-cluster-v1-SendResponse) stream | | + + + + + <a name="banyandb_common_v1_common-proto"></a> <p align="right"><a href="#top">Top</a></p> @@ -326,136 +386,47 @@ Metadata is for multi-tenant, multi-model use -<a name="banyandb_model_v1_common-proto"></a> +<a name="banyandb_database_v1_database-proto"></a> <p align="right"><a href="#top">Top</a></p> -## banyandb/model/v1/common.proto - - - -<a name="banyandb-model-v1-FieldValue"></a> - -### FieldValue - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| null | [google.protobuf.NullValue](#google-protobuf-NullValue) | | | -| str | [Str](#banyandb-model-v1-Str) | | | -| int | [Int](#banyandb-model-v1-Int) | | | -| binary_data | [bytes](#bytes) | | | -| float | [Float](#banyandb-model-v1-Float) | | | - - - - - - -<a name="banyandb-model-v1-Float"></a> - -### Float - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [double](#double) | | | - - - - - - -<a name="banyandb-model-v1-Int"></a> - -### Int - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [int64](#int64) | | | - - - - - - -<a name="banyandb-model-v1-IntArray"></a> - -### IntArray - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [int64](#int64) | repeated | | - - - - - - -<a name="banyandb-model-v1-Str"></a> - -### Str - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [string](#string) | | | - - - - - - -<a name="banyandb-model-v1-StrArray"></a> - -### StrArray - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| value | [string](#string) | repeated | | - - - +## banyandb/database/v1/database.proto -<a name="banyandb-model-v1-TagFamilyForWrite"></a> +<a name="banyandb-database-v1-Node"></a> -### TagFamilyForWrite +### Node | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| tags | [TagValue](#banyandb-model-v1-TagValue) | repeated | | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | +| roles | [Role](#banyandb-database-v1-Role) | repeated | | +| grpc_address | [string](#string) | | | +| http_address | [string](#string) | | | +| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | -<a name="banyandb-model-v1-TagValue"></a> +<a name="banyandb-database-v1-Shard"></a> -### TagValue +### Shard | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| null | [google.protobuf.NullValue](#google-protobuf-NullValue) | | | -| str | [Str](#banyandb-model-v1-Str) | | | -| str_array | [StrArray](#banyandb-model-v1-StrArray) | | | -| int | [Int](#banyandb-model-v1-Int) | | | -| int_array | [IntArray](#banyandb-model-v1-IntArray) | | | -| binary_data | [bytes](#bytes) | | | +| id | [uint64](#uint64) | | | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | +| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | | +| node | [string](#string) | | | +| total | [uint32](#uint32) | | | +| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | @@ -464,97 +435,18 @@ Metadata is for multi-tenant, multi-model use -<a name="banyandb-model-v1-AggregationFunction"></a> +<a name="banyandb-database-v1-Role"></a> -### AggregationFunction +### Role | Name | Number | Description | | ---- | ------ | ----------- | -| AGGREGATION_FUNCTION_UNSPECIFIED | 0 | | -| AGGREGATION_FUNCTION_MEAN | 1 | | -| AGGREGATION_FUNCTION_MAX | 2 | | -| AGGREGATION_FUNCTION_MIN | 3 | | -| AGGREGATION_FUNCTION_COUNT | 4 | | -| AGGREGATION_FUNCTION_SUM | 5 | | - - - - - - - - - - -<a name="banyandb_measure_v1_write-proto"></a> -<p align="right"><a href="#top">Top</a></p> - -## banyandb/measure/v1/write.proto - - - -<a name="banyandb-measure-v1-DataPointValue"></a> - -### DataPointValue -DataPointValue is the data point for writing. It only contains values. - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. | -| tag_families | [banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | repeated | the order of tag_families' items match the measure schema | -| fields | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | repeated | the order of fields match the measure schema | - - - - - - -<a name="banyandb-measure-v1-InternalWriteRequest"></a> - -### InternalWriteRequest - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| shard_id | [uint32](#uint32) | | | -| series_hash | [bytes](#bytes) | | | -| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | repeated | | -| request | [WriteRequest](#banyandb-measure-v1-WriteRequest) | | | - - - - - - -<a name="banyandb-measure-v1-WriteRequest"></a> - -### WriteRequest -WriteRequest is the request contract for write - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is required. | -| data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) | | the data_point is required. | - - - - - - -<a name="banyandb-measure-v1-WriteResponse"></a> - -### WriteResponse -WriteResponse is the response contract for write - - - - +| ROLE_UNSPECIFIED | 0 | | +| ROLE_META | 1 | | +| ROLE_DATA | 2 | | +| ROLE_LIAISON | 3 | | - @@ -564,178 +456,136 @@ WriteResponse is the response contract for write -<a name="banyandb_stream_v1_write-proto"></a> +<a name="banyandb_model_v1_common-proto"></a> <p align="right"><a href="#top">Top</a></p> -## banyandb/stream/v1/write.proto - - - -<a name="banyandb-stream-v1-ElementValue"></a> - -### ElementValue - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| element_id | [string](#string) | | element_id could be span_id of a Span or segment_id of a Segment in the context of stream | -| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. It represents 1) either the start time of a Span/Segment, 2) or the timestamp of a log | -| tag_families | [banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | repeated | the order of tag_families' items match the stream schema | - - - - - - -<a name="banyandb-stream-v1-InternalWriteRequest"></a> - -### InternalWriteRequest - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| shard_id | [uint32](#uint32) | | | -| series_hash | [bytes](#bytes) | | | -| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | repeated | | -| request | [WriteRequest](#banyandb-stream-v1-WriteRequest) | | | - - - +## banyandb/model/v1/common.proto -<a name="banyandb-stream-v1-WriteRequest"></a> +<a name="banyandb-model-v1-FieldValue"></a> -### WriteRequest +### FieldValue | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is only required in the first write. | -| element | [ElementValue](#banyandb-stream-v1-ElementValue) | | the element is required. | +| null | [google.protobuf.NullValue](#google-protobuf-NullValue) | | | +| str | [Str](#banyandb-model-v1-Str) | | | +| int | [Int](#banyandb-model-v1-Int) | | | +| binary_data | [bytes](#bytes) | | | +| float | [Float](#banyandb-model-v1-Float) | | | -<a name="banyandb-stream-v1-WriteResponse"></a> +<a name="banyandb-model-v1-Float"></a> -### WriteResponse +### Float +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| value | [double](#double) | | | - - - - +<a name="banyandb-model-v1-Int"></a> +### Int -<a name="banyandb_cluster_v1_rpc-proto"></a> -<p align="right"><a href="#top">Top</a></p> -## banyandb/cluster/v1/rpc.proto +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| value | [int64](#int64) | | | -<a name="banyandb-cluster-v1-WriteRequest"></a> -### WriteRequest +<a name="banyandb-model-v1-IntArray"></a> -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| topic | [string](#string) | | | -| message_id | [uint64](#uint64) | | | -| stream | [banyandb.stream.v1.InternalWriteRequest](#banyandb-stream-v1-InternalWriteRequest) | | | -| measure | [banyandb.measure.v1.InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest) | | | +### IntArray +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| value | [int64](#int64) | repeated | | -<a name="banyandb-cluster-v1-WriteResponse"></a> -### WriteResponse +<a name="banyandb-model-v1-Str"></a> +### Str - +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| value | [string](#string) | | | - - -<a name="banyandb-cluster-v1-Service"></a> -### Service +<a name="banyandb-model-v1-StrArray"></a> -| Method Name | Request Type | Response Type | Description | -| ----------- | ------------ | ------------- | ------------| -| Write | [WriteRequest](#banyandb-cluster-v1-WriteRequest) stream | [WriteResponse](#banyandb-cluster-v1-WriteResponse) stream | | +### StrArray - +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| value | [string](#string) | repeated | | -<a name="banyandb_database_v1_database-proto"></a> -<p align="right"><a href="#top">Top</a></p> -## banyandb/database/v1/database.proto -<a name="banyandb-database-v1-Node"></a> -### Node +<a name="banyandb-model-v1-TagFamilyForWrite"></a> + +### TagFamilyForWrite | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| roles | [Role](#banyandb-database-v1-Role) | repeated | | -| grpc_address | [string](#string) | | | -| http_address | [string](#string) | | | -| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| tags | [TagValue](#banyandb-model-v1-TagValue) | repeated | | -<a name="banyandb-database-v1-Shard"></a> +<a name="banyandb-model-v1-TagValue"></a> -### Shard +### TagValue | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| id | [uint64](#uint64) | | | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | | -| node | [string](#string) | | | -| total | [uint32](#uint32) | | | -| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | -| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| null | [google.protobuf.NullValue](#google-protobuf-NullValue) | | | +| str | [Str](#banyandb-model-v1-Str) | | | +| str_array | [StrArray](#banyandb-model-v1-StrArray) | | | +| int | [Int](#banyandb-model-v1-Int) | | | +| int_array | [IntArray](#banyandb-model-v1-IntArray) | | | +| binary_data | [bytes](#bytes) | | | @@ -744,18 +594,19 @@ WriteResponse is the response contract for write -<a name="banyandb-database-v1-Role"></a> +<a name="banyandb-model-v1-AggregationFunction"></a> -### Role +### AggregationFunction | Name | Number | Description | | ---- | ------ | ----------- | -| ROLE_UNSPECIFIED | 0 | | -| ROLE_META | 1 | | -| ROLE_DATA | 2 | | -| ROLE_QUERY | 3 | | -| ROLE_LIAISON | 4 | | +| AGGREGATION_FUNCTION_UNSPECIFIED | 0 | | +| AGGREGATION_FUNCTION_MEAN | 1 | | +| AGGREGATION_FUNCTION_MAX | 2 | | +| AGGREGATION_FUNCTION_MIN | 3 | | +| AGGREGATION_FUNCTION_COUNT | 4 | | +| AGGREGATION_FUNCTION_SUM | 5 | | @@ -2647,6 +2498,83 @@ TopNResponse is the response for a query to the Query module. +<a name="banyandb_measure_v1_write-proto"></a> +<p align="right"><a href="#top">Top</a></p> + +## banyandb/measure/v1/write.proto + + + +<a name="banyandb-measure-v1-DataPointValue"></a> + +### DataPointValue +DataPointValue is the data point for writing. It only contains values. + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. | +| tag_families | [banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | repeated | the order of tag_families' items match the measure schema | +| fields | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | repeated | the order of fields match the measure schema | + + + + + + +<a name="banyandb-measure-v1-InternalWriteRequest"></a> + +### InternalWriteRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| shard_id | [uint32](#uint32) | | | +| series_hash | [bytes](#bytes) | | | +| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | repeated | | +| request | [WriteRequest](#banyandb-measure-v1-WriteRequest) | | | + + + + + + +<a name="banyandb-measure-v1-WriteRequest"></a> + +### WriteRequest +WriteRequest is the request contract for write + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is required. | +| data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) | | the data_point is required. | + + + + + + +<a name="banyandb-measure-v1-WriteResponse"></a> + +### WriteResponse +WriteResponse is the response contract for write + + + + + + + + + + + + + + + <a name="banyandb_measure_v1_rpc-proto"></a> <p align="right"><a href="#top">Top</a></p> @@ -2964,6 +2892,83 @@ QueryResponse is the response for a query to the Query module. +<a name="banyandb_stream_v1_write-proto"></a> +<p align="right"><a href="#top">Top</a></p> + +## banyandb/stream/v1/write.proto + + + +<a name="banyandb-stream-v1-ElementValue"></a> + +### ElementValue + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| element_id | [string](#string) | | element_id could be span_id of a Span or segment_id of a Segment in the context of stream | +| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | timestamp is in the timeunit of milliseconds. It represents 1) either the start time of a Span/Segment, 2) or the timestamp of a log | +| tag_families | [banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | repeated | the order of tag_families' items match the stream schema | + + + + + + +<a name="banyandb-stream-v1-InternalWriteRequest"></a> + +### InternalWriteRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| shard_id | [uint32](#uint32) | | | +| series_hash | [bytes](#bytes) | | | +| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | repeated | | +| request | [WriteRequest](#banyandb-stream-v1-WriteRequest) | | | + + + + + + +<a name="banyandb-stream-v1-WriteRequest"></a> + +### WriteRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | the metadata is only required in the first write. | +| element | [ElementValue](#banyandb-stream-v1-ElementValue) | | the element is required. | + + + + + + +<a name="banyandb-stream-v1-WriteResponse"></a> + +### WriteResponse + + + + + + + + + + + + + + + + <a name="banyandb_stream_v1_rpc-proto"></a> <p align="right"><a href="#top">Top</a></p> diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index c7a8247b..6ad65ada 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -94,6 +94,11 @@ type Publisher interface { Publish(topic Topic, message ...Message) (Future, error) } +// Broadcaster allow sending Messages to a Topic and receiving the responses. +type Broadcaster interface { + Broadcast(topic Topic, message Message) ([]Future, error) +} + type channel chan event type chType int diff --git a/pkg/iter/sort/sort.go b/pkg/iter/sort/sort.go new file mode 100644 index 00000000..b0d67eb5 --- /dev/null +++ b/pkg/iter/sort/sort.go @@ -0,0 +1,133 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package sort provides a generic iterator that merges multiple sorted iterators. +package sort + +import ( + "bytes" + "container/heap" + "fmt" +) + +// Comparable is an interface that allows sorting of items. +type Comparable interface { + SortedField() []byte +} + +// Iterator is a stream of items of Comparable type. +type Iterator[T Comparable] interface { + Next() bool + Val() T + Close() error +} + +type container[T Comparable] struct { + item T + iter Iterator[T] +} + +type containerHeap[T Comparable] struct { + items []*container[T] + desc bool +} + +func (h containerHeap[T]) Len() int { + return len(h.items) +} + +func (h containerHeap[T]) Less(i, j int) bool { + if h.desc { + return bytes.Compare(h.items[i].item.SortedField(), h.items[j].item.SortedField()) > 0 + } + return bytes.Compare(h.items[i].item.SortedField(), h.items[j].item.SortedField()) < 0 +} + +func (h containerHeap[T]) Swap(i, j int) { + h.items[i], h.items[j] = h.items[j], h.items[i] +} + +func (h *containerHeap[T]) Push(x interface{}) { + item := x.(*container[T]) + h.items = append(h.items, item) +} + +func (h *containerHeap[T]) Pop() interface{} { + old := h.items + n := len(old) + item := old[n-1] + h.items = old[0 : n-1] + return item +} + +type itemIter[T Comparable] struct { + curr T + h *containerHeap[T] + iters []Iterator[T] +} + +// NewItemIter returns a new iterator that merges multiple sorted iterators. +func NewItemIter[T Comparable](iters []Iterator[T], desc bool) Iterator[T] { + var def T + it := &itemIter[T]{ + iters: iters, + h: &containerHeap[T]{items: make([]*container[T], 0), desc: desc}, + curr: def, + } + it.initialize() + return it +} + +func (it *itemIter[T]) initialize() { + heap.Init(it.h) + for _, iter := range it.iters { + if iter.Next() { + heap.Push(it.h, &container[T]{item: iter.Val(), iter: iter}) + } + } +} + +func (it *itemIter[T]) pushIterator(iter Iterator[T]) { + if iter.Next() { + heap.Push(it.h, &container[T]{item: iter.Val(), iter: iter}) + } +} + +func (it *itemIter[T]) Next() bool { + if it.h.Len() == 0 { + return false + } + top := heap.Pop(it.h).(*container[T]) + it.curr = top.item + it.pushIterator(top.iter) + return true +} + +func (it *itemIter[T]) Val() T { + return it.curr +} + +func (it *itemIter[T]) Close() error { + var err error + for _, iter := range it.iters { + if e := iter.Close(); e != nil { + fmt.Println("Error closing iterator:", e) + err = e + } + } + return err +} diff --git a/pkg/iter/sort/sort_test.go b/pkg/iter/sort/sort_test.go new file mode 100644 index 00000000..e91a0f96 --- /dev/null +++ b/pkg/iter/sort/sort_test.go @@ -0,0 +1,102 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sort_test + +import ( + "encoding/binary" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/iter/sort" +) + +type Int int + +func (i Int) SortedField() []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + return b +} + +// MockIterator is a mock for the Iterator interface. +type MockIterator struct { + items []Int + index int +} + +func NewMockIterator(items []Int) *MockIterator { + return &MockIterator{items: items, index: -1} +} + +func (mi *MockIterator) Next() bool { + mi.index++ + return mi.index < len(mi.items) +} + +func (mi *MockIterator) Val() Int { + return mi.items[mi.index] +} + +func (mi *MockIterator) Close() error { + return nil +} + +func TestItemIter_Ascending(t *testing.T) { + iters := []sort.Iterator[Int]{ + NewMockIterator([]Int{1, 3, 5}), + NewMockIterator([]Int{2, 4, 6}), + } + + iter := sort.NewItemIter(iters, false) // Ascending order + for i := Int(1); i <= 6; i++ { + if !iter.Next() { + t.Fatalf("expected Next() to be true, got false") + } + if val := iter.Val(); val != i { + t.Errorf("expected Val() to be %d, got %d", i, val) + } + } + if iter.Next() { + t.Errorf("expected Next() to be false, got true") + } + if err := iter.Close(); err != nil { + t.Errorf("expected Close() to return nil, got error: %v", err) + } +} + +func TestItemIter_Descending(t *testing.T) { + iters := []sort.Iterator[Int]{ + NewMockIterator([]Int{5, 3, 1}), + NewMockIterator([]Int{6, 4, 2}), + } + + iter := sort.NewItemIter(iters, true) // Descending order + for i := Int(6); i >= 1; i-- { + if !iter.Next() { + t.Fatalf("expected Next() to be true, got false") + } + if val := iter.Val(); val != i { + t.Errorf("expected Val() to be %d, got %d", i, val) + } + } + if iter.Next() { + t.Errorf("expected Next() to be false, got true") + } + if err := iter.Close(); err != nil { + t.Errorf("expected Close() to return nil, got error: %v", err) + } +} diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go index b5b184ae..a206293b 100644 --- a/pkg/query/executor/interface.go +++ b/pkg/query/executor/interface.go @@ -19,11 +19,14 @@ package executor import ( + "context" + "github.com/apache/skywalking-banyandb/api/common" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/bus" ) // ExecutionContext allows retrieving data from tsdb. @@ -39,9 +42,24 @@ type StreamExecutionContext interface { ParseElementID(item tsdb.Item) (string, error) } +// StreamExecutionContextKey is the key of stream execution context in context.Context. +type StreamExecutionContextKey struct{} + +var streamExecutionContextKeyInstance = StreamExecutionContextKey{} + +// WithStreamExecutionContext returns a new context with stream execution context. +func WithStreamExecutionContext(ctx context.Context, ec StreamExecutionContext) context.Context { + return context.WithValue(ctx, streamExecutionContextKeyInstance, ec) +} + +// FromStreamExecutionContext returns the stream execution context from context.Context. +func FromStreamExecutionContext(ctx context.Context) StreamExecutionContext { + return ctx.Value(streamExecutionContextKeyInstance).(StreamExecutionContext) +} + // StreamExecutable allows querying in the stream schema. type StreamExecutable interface { - Execute(StreamExecutionContext) ([]*streamv1.Element, error) + Execute(context.Context) ([]*streamv1.Element, error) } // MeasureExecutionContext allows retrieving data through the measure module. @@ -50,6 +68,21 @@ type MeasureExecutionContext interface { ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, error) } +// MeasureExecutionContextKey is the key of measure execution context in context.Context. +type MeasureExecutionContextKey struct{} + +var measureExecutionContextKeyInstance = MeasureExecutionContextKey{} + +// WithMeasureExecutionContext returns a new context with measure execution context. +func WithMeasureExecutionContext(ctx context.Context, ec MeasureExecutionContext) context.Context { + return context.WithValue(ctx, measureExecutionContextKeyInstance, ec) +} + +// FromMeasureExecutionContext returns the measure execution context from context.Context. +func FromMeasureExecutionContext(ctx context.Context) MeasureExecutionContext { + return ctx.Value(measureExecutionContextKeyInstance).(MeasureExecutionContext) +} + // MIterator allows iterating in a measure data set. type MIterator interface { Next() bool @@ -61,5 +94,26 @@ type MIterator interface { // MeasureExecutable allows querying in the measure schema. type MeasureExecutable interface { - Execute(MeasureExecutionContext) (MIterator, error) + Execute(context.Context) (MIterator, error) +} + +// DistributedExecutionContext allows retrieving data through the distributed module. +type DistributedExecutionContext interface { + bus.Broadcaster + TimeRange() *modelv1.TimeRange +} + +// DistributedExecutionContextKey is the key of distributed execution context in context.Context. +type DistributedExecutionContextKey struct{} + +var distributedExecutionContextKeyInstance = DistributedExecutionContextKey{} + +// WithDistributedExecutionContext returns a new context with distributed execution context. +func WithDistributedExecutionContext(ctx context.Context, ec DistributedExecutionContext) context.Context { + return context.WithValue(ctx, distributedExecutionContextKeyInstance, ec) +} + +// FromDistributedExecutionContext returns the distributed execution context from context.Context. +func FromDistributedExecutionContext(ctx context.Context) DistributedExecutionContext { + return ctx.Value(distributedExecutionContextKeyInstance).(DistributedExecutionContext) } diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index 73a46032..4297c296 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -18,7 +18,6 @@ package logical import ( - "bytes" "context" "io" "time" @@ -28,6 +27,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -48,20 +48,8 @@ type ( // SeekerBuilder wraps the execution of tsdb.SeekerBuilder. // TODO:// we could have a chance to remove this wrapper. SeekerBuilder func(builder tsdb.SeekerBuilder) - - comparator func(a, b tsdb.Item) bool ) -func createComparator(sortDirection modelv1.Sort) comparator { - return func(a, b tsdb.Item) bool { - comp := bytes.Compare(a.SortedField(), b.SortedField()) - if sortDirection == modelv1.Sort_SORT_DESC { - return comp == 1 - } - return comp == -1 - } -} - // ProjectItem parses the item within the StreamExecutionContext. // projectionFieldRefs must be prepared before calling this method, projectionFieldRefs should be a list of // tag list where the inner list must exist in the same tag family. @@ -104,16 +92,16 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe // with the help of Entity. The result is a list of element set, where the order of inner list is kept // as what the users specify in the seekerBuilder. // This method is used by the underlying tableScan and localIndexScan plans. -func ExecuteForShard(l *logger.Logger, series tsdb.SeriesList, timeRange timestamp.TimeRange, +func ExecuteForShard(ctx context.Context, l *logger.Logger, series tsdb.SeriesList, timeRange timestamp.TimeRange, builders ...SeekerBuilder, ) ([]tsdb.Iterator, []io.Closer, error) { var itersInShard []tsdb.Iterator var closers []io.Closer for _, seriesFound := range series { itersInSeries, err := func() ([]tsdb.Iterator, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctxSeries, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - sp, errInner := seriesFound.Span(context.WithValue(ctx, logger.ContextKey, l), timeRange) + sp, errInner := seriesFound.Span(context.WithValue(ctxSeries, logger.ContextKey, l), timeRange) if errInner != nil { if errors.Is(errInner, tsdb.ErrEmptySeriesSpan) { return nil, nil @@ -222,3 +210,15 @@ func StringSlicesEqual(a, b []string) bool { } return true } + +// NewItemIter returns a ItemIterator which mergers several tsdb.Iterator by input sorting order. +func NewItemIter(iters []tsdb.Iterator, s modelv1.Sort) sort.Iterator[tsdb.Item] { + var ii []sort.Iterator[tsdb.Item] + for _, iter := range iters { + ii = append(ii, iter) + } + if s == modelv1.Sort_SORT_DESC { + return sort.NewItemIter[tsdb.Item](ii, true) + } + return sort.NewItemIter[tsdb.Item](ii, false) +} diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go index 0d16d180..4ab9ff01 100644 --- a/pkg/query/logical/index_filter.go +++ b/pkg/query/logical/index_filter.go @@ -47,7 +47,9 @@ func (g GlobalIndexError) Error() string { return g.IndexRule.String() } // BuildLocalFilter returns a new index.Filter for local indices. // It could parse series Path at the same time. -func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) { +func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, + entity tsdb.Entity, mandatoryIndexRule bool, +) (index.Filter, []tsdb.Entity, error) { if criteria == nil { return nil, []tsdb.Entity{entity}, nil } @@ -74,6 +76,8 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ } } return parseCondition(cond, indexRule, expr, entity) + } else if mandatoryIndexRule { + return nil, nil, errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond) } return eNode, []tsdb.Entity{entity}, nil case *modelv1.Criteria_Le: @@ -82,16 +86,16 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ return nil, nil, errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) } if le.GetLeft() == nil { - return BuildLocalFilter(le.Right, schema, entityDict, entity) + return BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) } if le.GetRight() == nil { - return BuildLocalFilter(le.Left, schema, entityDict, entity) + return BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) } - left, leftEntities, err := BuildLocalFilter(le.Left, schema, entityDict, entity) + left, leftEntities, err := BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) if err != nil { return nil, nil, err } - right, rightEntities, err := BuildLocalFilter(le.Right, schema, entityDict, entity) + right, rightEntities, err := BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) if err != nil { return nil, nil, err } diff --git a/pkg/query/logical/iter.go b/pkg/query/logical/iter.go deleted file mode 100644 index e522969a..00000000 --- a/pkg/query/logical/iter.go +++ /dev/null @@ -1,134 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package logical - -import ( - "container/heap" - "io" - - "go.uber.org/multierr" - - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/tsdb" -) - -var _ ItemIterator = (*itemIter)(nil) - -// ItemIterator allow iterating over a tsdb's series. -type ItemIterator interface { - HasNext() bool - Next() tsdb.Item - io.Closer -} - -var _ heap.Interface = (*containerHeap)(nil) - -// container contains both iter and its current item. -type container struct { - c comparator - item tsdb.Item - iter tsdb.Iterator -} - -type containerHeap []*container - -func (h containerHeap) Len() int { return len(h) } -func (h containerHeap) Less(i, j int) bool { return h[i].c(h[i].item, h[j].item) } -func (h containerHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } - -func (h *containerHeap) Push(x interface{}) { - *h = append(*h, x.(*container)) -} - -func (h *containerHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -type itemIter struct { - c comparator - h *containerHeap - iters []tsdb.Iterator -} - -// NewItemIter returns a ItemIterator which mergers several tsdb.Iterator by input sorting order. -func NewItemIter(iters []tsdb.Iterator, sort modelv1.Sort) ItemIterator { - it := &itemIter{ - c: createComparator(sort), - iters: iters, - h: &containerHeap{}, - } - it.init() - return it -} - -// init function MUST be called while initialization. -// 1. Move all iterator to the first item by invoking their Next. -// 2. Load all first items into a slice. -func (it *itemIter) init() { - for _, iter := range it.iters { - it.pushIterator(iter) - } - // heap initialization - heap.Init(it.h) -} - -// pushIterator pushes the given iterator into the underlying deque. -// Status will be immediately checked if the Iterator has a next value. -// 1 - If not, it will be close at once and will not be added to the slice, -// -// which means inactive iterator does not exist in the deq. -// -// 2 - If so, it will be wrapped into a container and push to the deq. -// -// Then we call SliceStable sort to sort the deq. -func (it *itemIter) pushIterator(iter tsdb.Iterator) { - if !iter.Next() { - return - } - heap.Push(it.h, &container{ - item: iter.Val(), - iter: iter, - c: it.c, - }) -} - -func (it *itemIter) HasNext() bool { - return it.h.Len() > 0 -} - -func (it *itemIter) Next() tsdb.Item { - // 3. Pop up the minimal item through the order value - c := heap.Pop(it.h).(*container) - - // 4. Move the iterator whose value is popped in step 3, push the next value of this iterator into the slice. - it.pushIterator(c.iter) - - return c.item -} - -// Close closes all underlying iterators. -func (it *itemIter) Close() (err error) { - for _, iter := range it.iters { - err = multierr.Append(err, iter.Close()) - } - return err -} diff --git a/pkg/query/logical/measure/measure_analyzer.go b/pkg/query/logical/measure/measure_analyzer.go index 30a3623d..0e7f0fff 100644 --- a/pkg/query/logical/measure/measure_analyzer.go +++ b/pkg/query/logical/measure/measure_analyzer.go @@ -22,21 +22,20 @@ import ( "math" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" - "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) const defaultLimit uint32 = 100 // BuildSchema returns Schema loaded from the metadata repository. -func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) { - md := measureSchema.GetSchema() +func BuildSchema(md *databasev1.Measure, indexRules []*databasev1.IndexRule) (logical.Schema, error) { md.GetEntity() ms := &schema{ common: &logical.CommonSchema{ - IndexRules: measureSchema.GetIndexRules(), + IndexRules: indexRules, TagSpecMap: make(map[string]*logical.TagSpec), EntityList: md.GetEntity().GetTagNames(), }, @@ -113,6 +112,66 @@ func Analyze(_ context.Context, criteria *measurev1.QueryRequest, metadata *comm return p, nil } +// DistributedAnalyze converts logical expressions to executable operation tree represented by Plan. +func DistributedAnalyze(criteria *measurev1.QueryRequest, s logical.Schema) (logical.Plan, error) { + groupByEntity := false + var groupByTags [][]*logical.Tag + if criteria.GetGroupBy() != nil { + groupByProjectionTags := criteria.GetGroupBy().GetTagProjection() + groupByTags = make([][]*logical.Tag, len(groupByProjectionTags.GetTagFamilies())) + tags := make([]string, 0) + for i, tagFamily := range groupByProjectionTags.GetTagFamilies() { + groupByTags[i] = logical.NewTags(tagFamily.GetName(), tagFamily.GetTags()...) + tags = append(tags, tagFamily.GetTags()...) + } + if logical.StringSlicesEqual(s.EntityList(), tags) { + groupByEntity = true + } + } + + // parse fields + plan := newUnresolvedDistributed(criteria) + + // parse limit and offset + limitParameter := criteria.GetLimit() + if limitParameter == 0 { + limitParameter = defaultLimit + } + pushedLimit := int(limitParameter + criteria.GetOffset()) + + if criteria.GetGroupBy() != nil { + plan = newUnresolvedGroupBy(plan, groupByTags, groupByEntity) + pushedLimit = math.MaxInt + } + + if criteria.GetAgg() != nil { + plan = newUnresolvedAggregation(plan, + logical.NewField(criteria.GetAgg().GetFieldName()), + criteria.GetAgg().GetFunction(), + criteria.GetGroupBy() != nil, + ) + pushedLimit = math.MaxInt + } + + if criteria.GetTop() != nil { + plan = top(plan, criteria.GetTop()) + } + + plan = limit(plan, criteria.GetOffset(), limitParameter) + p, err := plan.Analyze(s) + if err != nil { + return nil, err + } + rules := []logical.OptimizeRule{ + logical.NewPushDownOrder(criteria.OrderBy), + logical.NewPushDownMaxSize(pushedLimit), + } + if err := logical.ApplyRules(p, rules...); err != nil { + return nil, err + } + return p, nil +} + // parseFields parses the query request to decide which kind of plan should be generated // Basically, // 1 - If no criteria is given, we can only scan all shards diff --git a/pkg/query/logical/measure/measure_plan.go b/pkg/query/logical/measure/measure_plan.go index 870e5af9..d9bc6a68 100644 --- a/pkg/query/logical/measure/measure_plan.go +++ b/pkg/query/logical/measure/measure_plan.go @@ -18,6 +18,7 @@ package measure import ( + "context" "fmt" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" @@ -36,7 +37,7 @@ type limitPlan struct { limit uint32 } -func (l *limitPlan) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) { +func (l *limitPlan) Execute(ec context.Context) (executor.MIterator, error) { dps, err := l.Parent.Input.(executor.MeasureExecutable).Execute(ec) if err != nil { return nil, err diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go b/pkg/query/logical/measure/measure_plan_aggregation.go index 480ff273..e304119e 100644 --- a/pkg/query/logical/measure/measure_plan_aggregation.go +++ b/pkg/query/logical/measure/measure_plan_aggregation.go @@ -18,6 +18,7 @@ package measure import ( + "context" "fmt" "github.com/pkg/errors" @@ -121,7 +122,7 @@ func (g *aggregationPlan[N]) Schema() logical.Schema { return g.schema.ProjFields(g.aggregationFieldRef) } -func (g *aggregationPlan[N]) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) { +func (g *aggregationPlan[N]) Execute(ec context.Context) (executor.MIterator, error) { iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec) if err != nil { return nil, err diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go new file mode 100644 index 00000000..45ee4c66 --- /dev/null +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -0,0 +1,234 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "fmt" + + "go.uber.org/multierr" + "google.golang.org/protobuf/proto" + + "github.com/apache/skywalking-banyandb/api/data" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/iter/sort" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) + +type unresolvedDistributed struct { + originalQuery *measurev1.QueryRequest + order *logical.OrderBy + maxDataPointsSize int +} + +func newUnresolvedDistributed(query *measurev1.QueryRequest) logical.UnresolvedPlan { + return &unresolvedDistributed{ + originalQuery: query, + } +} + +func (ud *unresolvedDistributed) Limit(max int) { + ud.maxDataPointsSize = max +} + +func (ud *unresolvedDistributed) Sort(order *logical.OrderBy) { + ud.order = order +} + +func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) { + if ud.originalQuery.TagProjection == nil { + return nil, fmt.Errorf("tag projection is required") + } + if ud.originalQuery.FieldProjection == nil { + return nil, fmt.Errorf("filed projection is required") + } + temp := &measurev1.QueryRequest{ + TagProjection: ud.originalQuery.TagProjection, + FieldProjection: ud.originalQuery.FieldProjection, + Metadata: ud.originalQuery.Metadata, + Criteria: ud.originalQuery.Criteria, + Limit: uint32(ud.maxDataPointsSize), + OrderBy: &modelv1.QueryOrder{ + IndexRuleName: ud.order.Index.Metadata.Name, + Sort: ud.order.Sort, + }, + } + if ud.order == nil { + return &distributedPlan{ + queryTemplate: temp, + s: s, + sortByTime: true, + }, nil + } + ok, indexRule := s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName) + if !ok { + return nil, fmt.Errorf("index rule %s not found", ud.originalQuery.OrderBy.IndexRuleName) + } + if len(indexRule.Tags) != 1 { + return nil, fmt.Errorf("index rule %s should have only one tag", ud.originalQuery.OrderBy.IndexRuleName) + } + sortTagSpec := s.FindTagSpecByName(indexRule.Tags[0]) + if sortTagSpec == nil { + return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0]) + } + result := &distributedPlan{ + queryTemplate: temp, + s: s, + sortByTime: false, + sortTagSpec: *sortTagSpec, + } + if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { + result.desc = true + } + return result, nil +} + +type distributedPlan struct { + s logical.Schema + queryTemplate *measurev1.QueryRequest + sortTagSpec logical.TagSpec + sortByTime bool + desc bool +} + +func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, error) { + dctx := executor.FromDistributedExecutionContext(ctx) + query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest) + query.TimeRange = dctx.TimeRange() + ff, err := dctx.Broadcast(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + if err != nil { + return nil, err + } + var allErr error + var see []sort.Iterator[*comparableDataPoint] + for _, f := range ff { + if m, getErr := f.Get(); getErr != nil { + allErr = multierr.Append(allErr, getErr) + } else { + see = append(see, + newSortableElements(m.Data().(*measurev1.QueryResponse).DataPoints, + t.sortByTime, t.sortTagSpec)) + } + } + return &sortedMIterator{ + Iterator: sort.NewItemIter[*comparableDataPoint](see, t.desc), + }, allErr +} + +func (t *distributedPlan) String() string { + return fmt.Sprintf("distributed:%s", t.queryTemplate.String()) +} + +func (t *distributedPlan) Children() []logical.Plan { + return []logical.Plan{} +} + +func (t *distributedPlan) Schema() logical.Schema { + return t.s +} + +var _ sort.Comparable = (*comparableDataPoint)(nil) + +type comparableDataPoint struct { + *measurev1.DataPoint + sortField []byte +} + +func newComparableElement(e *measurev1.DataPoint, sortByTime bool, sortTagSpec logical.TagSpec) (*comparableDataPoint, error) { + var sortField []byte + if sortByTime { + sortField = convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano())) + } else { + var err error + sortField, err = pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value) + if err != nil { + return nil, err + } + } + + return &comparableDataPoint{ + DataPoint: e, + sortField: sortField, + }, nil +} + +func (e *comparableDataPoint) SortedField() []byte { + return e.sortField +} + +var _ sort.Iterator[*comparableDataPoint] = (*sortableElements)(nil) + +type sortableElements struct { + cur *comparableDataPoint + dataPoints []*measurev1.DataPoint + sortTagSpec logical.TagSpec + index int + isSortByTime bool +} + +func newSortableElements(dataPoints []*measurev1.DataPoint, isSortByTime bool, sortTagSpec logical.TagSpec) *sortableElements { + return &sortableElements{ + dataPoints: dataPoints, + isSortByTime: isSortByTime, + sortTagSpec: sortTagSpec, + } +} + +func (*sortableElements) Close() error { + return nil +} + +func (s *sortableElements) Next() bool { + return s.iter(func(e *measurev1.DataPoint) (*comparableDataPoint, error) { + return newComparableElement(e, s.isSortByTime, s.sortTagSpec) + }) +} + +func (s *sortableElements) Val() *comparableDataPoint { + return s.cur +} + +func (s *sortableElements) iter(fn func(*measurev1.DataPoint) (*comparableDataPoint, error)) bool { + if s.index >= len(s.dataPoints) { + return false + } + cur, err := fn(s.dataPoints[s.index]) + s.index++ + if err != nil { + return s.iter(fn) + } + s.cur = cur + return s.index < len(s.dataPoints) +} + +var _ executor.MIterator = (*sortedMIterator)(nil) + +type sortedMIterator struct { + sort.Iterator[*comparableDataPoint] +} + +func (s *sortedMIterator) Current() []*measurev1.DataPoint { + return []*measurev1.DataPoint{s.Val().DataPoint} +} diff --git a/pkg/query/logical/measure/measure_plan_groupby.go b/pkg/query/logical/measure/measure_plan_groupby.go index 60a907a5..9f62565d 100644 --- a/pkg/query/logical/measure/measure_plan_groupby.go +++ b/pkg/query/logical/measure/measure_plan_groupby.go @@ -18,6 +18,7 @@ package measure import ( + "context" "fmt" "math" @@ -101,14 +102,14 @@ func (g *groupBy) Schema() logical.Schema { return g.schema.ProjTags(g.groupByTagsRefs...) } -func (g *groupBy) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) { +func (g *groupBy) Execute(ec context.Context) (executor.MIterator, error) { if g.groupByEntity { return g.sort(ec) } return g.hash(ec) } -func (g *groupBy) sort(ec executor.MeasureExecutionContext) (executor.MIterator, error) { +func (g *groupBy) sort(ec context.Context) (executor.MIterator, error) { iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec) if err != nil { return nil, err @@ -116,7 +117,7 @@ func (g *groupBy) sort(ec executor.MeasureExecutionContext) (executor.MIterator, return newGroupSortIterator(iter, g.groupByTagsRefs), nil } -func (g *groupBy) hash(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) { +func (g *groupBy) hash(ec context.Context) (mit executor.MIterator, err error) { iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec) if err != nil { return nil, err diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index 1b45fdfd..7d9a1fa8 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -31,6 +31,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" @@ -76,7 +77,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) // fill AnyEntry by default entity[idx] = tsdb.AnyEntry } - filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, entityMap, entity) + filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, entityMap, entity, true) if err != nil { return nil, err } @@ -122,7 +123,7 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { i.order = order } -func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) { +func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, err error) { var orderBy *tsdb.OrderBy if i.order.Index != nil { orderBy = &tsdb.OrderBy{ @@ -130,6 +131,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit execu Sort: i.order.Sort, } } + ec := executor.FromMeasureExecutionContext(ctx) var seriesList tsdb.SeriesList for _, e := range i.entities { shards, errInternal := ec.Shards(e) @@ -139,7 +141,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit execu for _, shard := range shards { sl, errInternal := shard.Series().Search( context.WithValue( - context.Background(), + ctx, logger.ContextKey, i.l, ), @@ -163,7 +165,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit execu }) } // CAVEAT: the order of series list matters when sorting by an index. - iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...) + iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, i.timeRange, builders...) if err != nil { return nil, err } @@ -224,7 +226,7 @@ func indexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, projec var _ executor.MIterator = (*indexScanIterator)(nil) type indexScanIterator struct { - inner logical.ItemIterator + inner sort.Iterator[tsdb.Item] err error current *measurev1.DataPoint context transformContext @@ -232,7 +234,7 @@ type indexScanIterator struct { num int } -func newIndexScanIterator(inner logical.ItemIterator, context transformContext, max int) executor.MIterator { +func newIndexScanIterator(inner sort.Iterator[tsdb.Item], context transformContext, max int) executor.MIterator { return &indexScanIterator{ inner: inner, context: context, @@ -241,10 +243,10 @@ func newIndexScanIterator(inner logical.ItemIterator, context transformContext, } func (ism *indexScanIterator) Next() bool { - if !ism.inner.HasNext() || ism.err != nil || ism.num > ism.max { + if !ism.inner.Next() || ism.err != nil || ism.num > ism.max { return false } - nextItem := ism.inner.Next() + nextItem := ism.inner.Val() var err error if ism.current, err = transform(nextItem, ism.context); err != nil { ism.err = multierr.Append(ism.err, err) diff --git a/pkg/query/logical/measure/measure_plan_top.go b/pkg/query/logical/measure/measure_plan_top.go index 194f8e48..25fc5062 100644 --- a/pkg/query/logical/measure/measure_plan_top.go +++ b/pkg/query/logical/measure/measure_plan_top.go @@ -18,6 +18,7 @@ package measure import ( + "context" "fmt" "github.com/pkg/errors" @@ -90,7 +91,7 @@ func (g *topOp) Schema() logical.Schema { return g.Input.Schema() } -func (g *topOp) Execute(ec executor.MeasureExecutionContext) (mit executor.MIterator, err error) { +func (g *topOp) Execute(ec context.Context) (mit executor.MIterator, err error) { iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec) if err != nil { return nil, err diff --git a/pkg/query/logical/stream/stream_analyzer.go b/pkg/query/logical/stream/stream_analyzer.go index 4771eeab..919ac9ea 100644 --- a/pkg/query/logical/stream/stream_analyzer.go +++ b/pkg/query/logical/stream/stream_analyzer.go @@ -22,8 +22,8 @@ import ( "fmt" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) @@ -31,12 +31,10 @@ import ( const defaultLimit uint32 = 20 // BuildSchema returns Schema loaded from the metadata repository. -func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) { - sm := streamSchema.GetSchema() - +func BuildSchema(sm *databasev1.Stream, indexRules []*databasev1.IndexRule) (logical.Schema, error) { s := &schema{ common: &logical.CommonSchema{ - IndexRules: streamSchema.GetIndexRules(), + IndexRules: indexRules, TagSpecMap: make(map[string]*logical.TagSpec), EntityList: sm.GetEntity().GetTagNames(), }, @@ -77,6 +75,22 @@ func Analyze(_ context.Context, criteria *streamv1.QueryRequest, metadata *commo return p, nil } +// DistributedAnalyze converts logical expressions to executable operation tree represented by Plan. +func DistributedAnalyze(criteria *streamv1.QueryRequest, s logical.Schema) (logical.Plan, error) { + // parse fields + plan := newUnresolvedDistributed(criteria) + // parse offset + plan = newOffset(plan, criteria.GetOffset()) + + // parse limit + limitParameter := criteria.GetLimit() + if limitParameter == 0 { + limitParameter = defaultLimit + } + plan = newLimit(plan, limitParameter) + return plan.Analyze(s) +} + var ( _ logical.Plan = (*limit)(nil) _ logical.UnresolvedPlan = (*limit)(nil) @@ -93,7 +107,7 @@ type limit struct { LimitNum uint32 } -func (l *limit) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { +func (l *limit) Execute(ec context.Context) ([]*streamv1.Element, error) { entities, err := l.Parent.Input.(executor.StreamExecutable).Execute(ec) if err != nil { return nil, err @@ -146,7 +160,7 @@ type offset struct { offsetNum uint32 } -func (l *offset) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { +func (l *offset) Execute(ec context.Context) ([]*streamv1.Element, error) { elements, err := l.Parent.Input.(executor.StreamExecutable).Execute(ec) if err != nil { return nil, err diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go new file mode 100644 index 00000000..7fce94cc --- /dev/null +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -0,0 +1,214 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + "fmt" + + "go.uber.org/multierr" + "google.golang.org/protobuf/proto" + + "github.com/apache/skywalking-banyandb/api/data" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/iter/sort" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) + +type unresolvedDistributed struct { + originalQuery *streamv1.QueryRequest +} + +func newUnresolvedDistributed(query *streamv1.QueryRequest) logical.UnresolvedPlan { + return &unresolvedDistributed{ + originalQuery: query, + } +} + +func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, error) { + if ud.originalQuery.Projection == nil { + return nil, fmt.Errorf("projection is required") + } + limit := ud.originalQuery.GetLimit() + if limit == 0 { + limit = defaultLimit + } + temp := &streamv1.QueryRequest{ + Projection: ud.originalQuery.Projection, + Metadata: ud.originalQuery.Metadata, + Criteria: ud.originalQuery.Criteria, + Limit: limit, + OrderBy: ud.originalQuery.OrderBy, + } + if ud.originalQuery.OrderBy == nil { + return &distributedPlan{ + queryTemplate: temp, + s: s, + sortByTime: true, + }, nil + } + ok, indexRule := s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName) + if !ok { + return nil, fmt.Errorf("index rule %s not found", ud.originalQuery.OrderBy.IndexRuleName) + } + if len(indexRule.Tags) != 1 { + return nil, fmt.Errorf("index rule %s should have only one tag", ud.originalQuery.OrderBy.IndexRuleName) + } + sortTagSpec := s.FindTagSpecByName(indexRule.Tags[0]) + if sortTagSpec == nil { + return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0]) + } + result := &distributedPlan{ + queryTemplate: temp, + s: s, + sortByTime: false, + sortTagSpec: *sortTagSpec, + } + if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC { + result.desc = true + } + return result, nil +} + +type distributedPlan struct { + s logical.Schema + queryTemplate *streamv1.QueryRequest + sortTagSpec logical.TagSpec + sortByTime bool + desc bool +} + +func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, error) { + dctx := executor.FromDistributedExecutionContext(ctx) + query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest) + query.TimeRange = dctx.TimeRange() + ff, err := dctx.Broadcast(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + if err != nil { + return nil, err + } + var allErr error + var see []sort.Iterator[*comparableElement] + for _, f := range ff { + if m, getErr := f.Get(); getErr != nil { + allErr = multierr.Append(allErr, getErr) + } else { + see = append(see, + newSortableElements(m.Data().(*streamv1.QueryResponse).Elements, + t.sortByTime, t.sortTagSpec)) + } + } + iter := sort.NewItemIter[*comparableElement](see, t.desc) + var result []*streamv1.Element + for iter.Next() { + result = append(result, iter.Val().Element) + } + return result, nil +} + +func (t *distributedPlan) String() string { + return fmt.Sprintf("distributed:%s", t.queryTemplate.String()) +} + +func (t *distributedPlan) Children() []logical.Plan { + return []logical.Plan{} +} + +func (t *distributedPlan) Schema() logical.Schema { + return t.s +} + +var _ sort.Comparable = (*comparableElement)(nil) + +type comparableElement struct { + *streamv1.Element + sortField []byte +} + +func newComparableElement(e *streamv1.Element, sortByTime bool, sortTagSpec logical.TagSpec) (*comparableElement, error) { + var sortField []byte + if sortByTime { + sortField = convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano())) + } else { + var err error + sortField, err = pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value) + if err != nil { + return nil, err + } + } + + return &comparableElement{ + Element: e, + sortField: sortField, + }, nil +} + +func (e *comparableElement) SortedField() []byte { + return e.sortField +} + +var _ sort.Iterator[*comparableElement] = (*sortableElements)(nil) + +type sortableElements struct { + cur *comparableElement + elements []*streamv1.Element + sortTagSpec logical.TagSpec + index int + isSortByTime bool +} + +func newSortableElements(elements []*streamv1.Element, isSortByTime bool, sortTagSpec logical.TagSpec) *sortableElements { + return &sortableElements{ + elements: elements, + isSortByTime: isSortByTime, + sortTagSpec: sortTagSpec, + } +} + +func (*sortableElements) Close() error { + return nil +} + +func (s *sortableElements) Next() bool { + return s.iter(func(e *streamv1.Element) (*comparableElement, error) { + return newComparableElement(e, s.isSortByTime, s.sortTagSpec) + }) +} + +func (s *sortableElements) Val() *comparableElement { + return s.cur +} + +func (s *sortableElements) iter(fn func(*streamv1.Element) (*comparableElement, error)) bool { + if s.index >= len(s.elements) { + return false + } + cur, err := fn(s.elements[s.index]) + s.index++ + if err != nil { + return s.iter(fn) + } + s.cur = cur + return s.index < len(s.elements) +} diff --git a/pkg/query/logical/stream/stream_plan_indexscan_global.go b/pkg/query/logical/stream/stream_plan_indexscan_global.go index 4ec4fc39..ff1abb0c 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_global.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_global.go @@ -59,14 +59,15 @@ func (t *globalIndexScan) Schema() logical.Schema { return t.schema } -func (t *globalIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { +func (t *globalIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, error) { + ec := executor.FromStreamExecutionContext(ctx) shards, err := ec.Shards(nil) if err != nil { return nil, err } var elements []*streamv1.Element for _, shard := range shards { - elementsInShard, shardErr := t.executeForShard(ec, shard) + elementsInShard, shardErr := t.executeForShard(ctx, ec, shard) if shardErr != nil { return elements, shardErr } @@ -75,7 +76,7 @@ func (t *globalIndexScan) Execute(ec executor.StreamExecutionContext) ([]*stream return elements, nil } -func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, shard tsdb.Shard) ([]*streamv1.Element, error) { +func (t *globalIndexScan) executeForShard(ctx context.Context, ec executor.StreamExecutionContext, shard tsdb.Shard) ([]*streamv1.Element, error) { var elementsInShard []*streamv1.Element for _, term := range t.expr.Bytes() { itemIDs, err := shard.Index().Seek(index.Field{ @@ -98,9 +99,9 @@ func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, sh return elementsInShard, errors.WithStack(err) } err = func() error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctxSeries, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - item, closer, errInner := series.Get(ctx, itemID) + item, closer, errInner := series.Get(ctxSeries, itemID) defer func(closer io.Closer) { if closer != nil { _ = closer.Close() diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 5970e0f8..c011332d 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -62,8 +62,9 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { i.order = order } -func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements []*streamv1.Element, err error) { +func (i *localIndexScan) Execute(ctx context.Context) (elements []*streamv1.Element, err error) { var seriesList tsdb.SeriesList + ec := executor.FromStreamExecutionContext(ctx) for _, e := range i.entities { shards, errInternal := ec.Shards(e) if errInternal != nil { @@ -71,7 +72,7 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements [ } for _, shard := range shards { sl, errInternal := shard.Series().List(context.WithValue( - context.Background(), + ctx, logger.ContextKey, i.l, ), tsdb.NewPath(e)) @@ -99,7 +100,7 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements [ b.Filter(i.filter) }) } - iters, closers, err := logical.ExecuteForShard(i.l, seriesList, i.timeRange, builders...) + iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, i.timeRange, builders...) if err != nil { return nil, err } @@ -121,8 +122,8 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements [ defer func() { err = multierr.Append(err, it.Close()) }() - for it.HasNext() { - nextItem := it.Next() + for it.Next() { + nextItem := it.Val() tagFamilies, innerErr := logical.ProjectItem(ec, nextItem, i.projectionTagRefs) if innerErr != nil { return nil, innerErr diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go index 4ea0b3bf..f6e2f3f0 100644 --- a/pkg/query/logical/stream/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream/stream_plan_tag_filter.go @@ -18,6 +18,7 @@ package stream import ( + "context" "errors" "fmt" "time" @@ -58,7 +59,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error) entity[idx] = tsdb.AnyEntry } var err error - ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, s, entityDict, entity) + ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, s, entityDict, entity, false) if err != nil { var ge logical.GlobalIndexError if errors.As(err, &ge) { @@ -161,7 +162,7 @@ func newTagFilter(s logical.Schema, parent logical.Plan, tagFilter logical.TagFi } } -func (t *tagFilterPlan) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { +func (t *tagFilterPlan) Execute(ec context.Context) ([]*streamv1.Element, error) { entities, err := t.parent.(executor.StreamExecutable).Execute(ec) if err != nil { return nil, err diff --git a/pkg/run/channel_closer.go b/pkg/run/channel_closer.go index 54c62707..fa1af90d 100644 --- a/pkg/run/channel_closer.go +++ b/pkg/run/channel_closer.go @@ -101,6 +101,7 @@ func (c *ChannelCloser) CloseThenWait() { return } + c.cancel() c.lock.Lock() c.closed = true c.lock.Unlock() @@ -108,7 +109,6 @@ func (c *ChannelCloser) CloseThenWait() { c.sender.Done() c.sender.Wait() - c.cancel() c.receiver.Done() c.receiver.Wait() } diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go index 0b4cc94a..ce96cf39 100644 --- a/pkg/schema/metadata.go +++ b/pkg/schema/metadata.go @@ -15,13 +15,11 @@ // specific language governing permissions and limitations // under the License. -// Package schema implements a framework to sync schema info from the metadata repository. package schema import ( "context" "io" - "math" "sync" "sync/atomic" "time" @@ -38,78 +36,56 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) -// EventType defines actions of events. -type EventType uint8 +var _ Resource = (*resourceSpec)(nil) -// EventType support Add/Update and Delete. -// All events are idempotent. -const ( - EventAddOrUpdate EventType = iota - EventDelete -) - -// EventKind defines category of events. -type EventKind uint8 - -// This framework groups events to a hierarchy. A group is the root node. -const ( - EventKindGroup EventKind = iota - EventKindResource -) +type resourceSpec struct { + schema ResourceSchema + delegated io.Closer + indexRules []*databasev1.IndexRule + aggregations []*databasev1.TopNAggregation +} -// Group is the root node, allowing get resources from its sub nodes. -type Group interface { - GetSchema() *commonv1.Group - StoreResource(ctx context.Context, resourceSchema ResourceSchema) (Resource, error) - LoadResource(name string) (Resource, bool) +func (rs *resourceSpec) Delegated() io.Closer { + return rs.delegated } -// MetadataEvent is the syncing message between metadata repo and this framework. -type MetadataEvent struct { - Metadata *commonv1.Metadata - Typ EventType - Kind EventKind +func (rs *resourceSpec) Close() error { + return rs.delegated.Close() } -// ResourceSchema allows get the metadata. -type ResourceSchema interface { - GetMetadata() *commonv1.Metadata +func (rs *resourceSpec) Schema() ResourceSchema { + return rs.schema } -// ResourceSpec wraps required fields to open a resource. -type ResourceSpec struct { - Schema ResourceSchema - // IndexRules are index rules bound to the Schema - IndexRules []*databasev1.IndexRule - // Aggregations are topN aggregation bound to the Schema, e.g. TopNAggregation - Aggregations []*databasev1.TopNAggregation +func (rs *resourceSpec) IndexRules() []*databasev1.IndexRule { + return rs.indexRules } -// Resource allows access metadata from a local cache. -type Resource interface { - GetIndexRules() []*databasev1.IndexRule - GetTopN() []*databasev1.TopNAggregation - MaxObservedModRevision() int64 - ResourceSchema - io.Closer +func (rs *resourceSpec) TopN() []*databasev1.TopNAggregation { + return rs.aggregations } -// ResourceSupplier allows open a resource and its embedded tsdb. -type ResourceSupplier interface { - OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) (Resource, error) - ResourceSchema(metdata *commonv1.Metadata) (ResourceSchema, error) - OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) +func (rs *resourceSpec) maxRevision() int64 { + return rs.schema.GetMetadata().GetModRevision() } -// Repository is the collection of several hierarchies groups by a "Group". -type Repository interface { - Watcher() - SendMetadataEvent(MetadataEvent) - StoreGroup(groupMeta *commonv1.Metadata) (*group, error) - LoadGroup(name string) (Group, bool) - LoadResource(metadata *commonv1.Metadata) (Resource, bool) - Close() - StopCh() <-chan struct{} +func (rs *resourceSpec) isNewThan(other *resourceSpec) bool { + if other.maxRevision() > rs.maxRevision() { + return false + } + if len(rs.indexRules) != len(other.indexRules) { + return false + } + if len(rs.aggregations) != len(other.aggregations) { + return false + } + if parseMaxModRevision(other.indexRules) > parseMaxModRevision(rs.indexRules) { + return false + } + if parseMaxModRevision(other.aggregations) > parseMaxModRevision(rs.aggregations) { + return false + } + return true } const defaultWorkerNum = 10 @@ -117,17 +93,28 @@ const defaultWorkerNum = 10 var _ Repository = (*schemaRepo)(nil) type schemaRepo struct { - metadata metadata.Repo - resourceSupplier ResourceSupplier - l *logger.Logger - data map[string]*group - workerCloser *run.Closer - closer *run.Closer - eventCh chan MetadataEvent - workerNum int + metadata metadata.Repo + resourceSupplier ResourceSupplier + resourceSchemaSupplier ResourceSchemaSupplier + l *logger.Logger + data map[string]*group + closer *run.ChannelCloser + eventCh chan MetadataEvent + workerNum int sync.RWMutex } +func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) { + if !sr.closer.AddSender() { + return + } + defer sr.closer.SenderDone() + select { + case sr.eventCh <- event: + case <-sr.closer.CloseNotify(): + } +} + // StopCh implements Repository. func (sr *schemaRepo) StopCh() <-chan struct{} { return sr.closer.CloseNotify() @@ -140,26 +127,42 @@ func NewRepository( resourceSupplier ResourceSupplier, ) Repository { return &schemaRepo{ - metadata: metadata, - l: l, - resourceSupplier: resourceSupplier, - data: make(map[string]*group), - eventCh: make(chan MetadataEvent, defaultWorkerNum), - workerNum: defaultWorkerNum, - workerCloser: run.NewCloser(defaultWorkerNum), - closer: run.NewCloser(1), + metadata: metadata, + l: l, + resourceSupplier: resourceSupplier, + resourceSchemaSupplier: resourceSupplier, + data: make(map[string]*group), + eventCh: make(chan MetadataEvent, defaultWorkerNum), + workerNum: defaultWorkerNum, + closer: run.NewChannelCloser(), } } -func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) { - sr.eventCh <- event +// NewPortableRepository return a new Repository without tsdb. +func NewPortableRepository( + metadata metadata.Repo, + l *logger.Logger, + supplier ResourceSchemaSupplier, +) Repository { + return &schemaRepo{ + metadata: metadata, + l: l, + resourceSchemaSupplier: supplier, + data: make(map[string]*group), + eventCh: make(chan MetadataEvent, defaultWorkerNum), + workerNum: defaultWorkerNum, + closer: run.NewChannelCloser(), + } } func (sr *schemaRepo) Watcher() { for i := 0; i < sr.workerNum; i++ { go func() { + if !sr.closer.AddReceiver() { + return + } defer func() { - sr.workerCloser.Done() + sr.closer.ReceiverDone() if err := recover(); err != nil { sr.l.Warn().Interface("err", err).Msg("watching the events") } @@ -180,7 +183,7 @@ func (sr *schemaRepo) Watcher() { case EventKindGroup: _, err = sr.StoreGroup(evt.Metadata) case EventKindResource: - _, err = sr.storeResource(evt.Metadata) + err = sr.storeResource(evt.Metadata) } case EventDelete: switch evt.Kind { @@ -191,14 +194,15 @@ func (sr *schemaRepo) Watcher() { } } if err != nil && !errors.Is(err, schema.ErrClosed) { - sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...") select { - case sr.eventCh <- evt: - case <-sr.workerCloser.CloseNotify(): + case <-sr.closer.CloseNotify(): return + default: } + sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...") + sr.SendMetadataEvent(evt) } - case <-sr.workerCloser.CloseNotify(): + case <-sr.closer.CloseNotify(): return } } @@ -207,27 +211,30 @@ func (sr *schemaRepo) Watcher() { } func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - groupSchema, err := sr.metadata.GroupRegistry().GetGroup(ctx, groupMeta.GetName()) - if err != nil { - return nil, err - } - name := groupSchema.GetMetadata().GetName() + name := groupMeta.GetName() sr.Lock() defer sr.Unlock() g, ok := sr.getGroup(name) if !ok { sr.l.Info().Str("group", name).Msg("creating a tsdb") - var db tsdb.Database - db, err = sr.resourceSupplier.OpenDB(groupSchema) - if err != nil { + g = sr.createGroup(name) + if err := g.init(name); err != nil { return nil, err } - g = newGroup(groupSchema, sr.metadata, db, sr.l, sr.resourceSupplier) - sr.data[name] = g return g, nil } + if !g.isInit() { + if err := g.init(name); err != nil { + return nil, err + } + return g, nil + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + groupSchema, err := g.metadata.GroupRegistry().GetGroup(ctx, name) + if err != nil { + return nil, err + } prevGroupSchema := g.GetSchema() if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision { return g, nil @@ -236,15 +243,22 @@ func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) { db := g.SupplyTSDB() db.Close() sr.l.Info().Str("group", name).Msg("creating a new tsdb") - newDB, err := sr.resourceSupplier.OpenDB(groupSchema) - if err != nil { + if err := g.init(name); err != nil { return nil, err } - g.setDB(newDB) - g.groupSchema.Store(groupSchema) return g, nil } +func (sr *schemaRepo) createGroup(name string) (g *group) { + if sr.resourceSupplier != nil { + g = newGroup(sr.metadata, sr.l, sr.resourceSupplier) + } else { + g = newPortableGroup(sr.metadata, sr.l) + } + sr.data[name] = g + return +} + func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error { name := groupMeta.GetName() sr.Lock() @@ -273,7 +287,11 @@ func (sr *schemaRepo) getGroup(name string) (*group, bool) { func (sr *schemaRepo) LoadGroup(name string) (Group, bool) { sr.RLock() defer sr.RUnlock() - return sr.getGroup(name) + g, ok := sr.getGroup(name) + if !ok { + return nil, false + } + return g, g.isInit() } func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) { @@ -284,19 +302,26 @@ func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, bool) return g.LoadResource(metadata.Name) } -func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, error) { - group, ok := sr.LoadGroup(metadata.Group) +func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) error { + g, ok := sr.LoadGroup(metadata.Group) if !ok { var err error - if group, err = sr.StoreGroup(&commonv1.Metadata{Name: metadata.Group}); err != nil { - return nil, errors.WithMessagef(err, "create unknown group:%s", metadata.Group) + if g, err = sr.StoreGroup(&commonv1.Metadata{Name: metadata.Group}); err != nil { + return errors.WithMessagef(err, "create unknown group:%s", metadata.Group) } } - stm, err := sr.resourceSupplier.ResourceSchema(metadata) + stm, err := sr.resourceSchemaSupplier.ResourceSchema(metadata) if err != nil { - return nil, errors.WithMessage(err, "fails to get the resource") + if errors.Is(err, schema.ErrGRPCResourceNotFound) { + if dl := sr.l.Debug(); dl.Enabled() { + dl.Interface("metadata", metadata).Msg("resource not found") + } + return nil + } + return errors.WithMessage(err, "fails to get the resource") } - return group.StoreResource(context.Background(), stm) + _, err = g.(*group).storeResource(context.Background(), stm) + return err } func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error { @@ -313,7 +338,8 @@ func (sr *schemaRepo) Close() { sr.l.Warn().Interface("err", err).Msg("closing resource") } }() - sr.workerCloser.CloseThenWait() + sr.closer.CloseThenWait() + close(sr.eventCh) sr.RLock() defer sr.RUnlock() @@ -323,8 +349,6 @@ func (sr *schemaRepo) Close() { sr.l.Err(err).RawJSON("group", logger.Proto(g.GetSchema().Metadata)).Msg("closing") } } - sr.closer.Done() - sr.closer.CloseThenWait() } var _ Group = (*group)(nil) @@ -335,14 +359,12 @@ type group struct { db atomic.Value groupSchema atomic.Pointer[commonv1.Group] l *logger.Logger - schemaMap map[string]Resource + schemaMap map[string]*resourceSpec mapMutex sync.RWMutex } func newGroup( - groupSchema *commonv1.Group, metadata metadata.Repo, - db tsdb.Database, l *logger.Logger, resourceSupplier ResourceSupplier, ) *group { @@ -350,12 +372,49 @@ func newGroup( groupSchema: atomic.Pointer[commonv1.Group]{}, metadata: metadata, l: l, - schemaMap: make(map[string]Resource), + schemaMap: make(map[string]*resourceSpec), resourceSupplier: resourceSupplier, } + return g +} + +func newPortableGroup( + metadata metadata.Repo, + l *logger.Logger, +) *group { + g := &group{ + groupSchema: atomic.Pointer[commonv1.Group]{}, + metadata: metadata, + l: l, + schemaMap: make(map[string]*resourceSpec), + } + return g +} + +func (g *group) init(name string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + groupSchema, err := g.metadata.GroupRegistry().GetGroup(ctx, name) + if errors.As(err, schema.ErrGRPCResourceNotFound) { + return nil + } + if err != nil { + return err + } g.groupSchema.Store(groupSchema) + if g.isPortable() { + return nil + } + db, err := g.resourceSupplier.OpenDB(groupSchema) + if err != nil { + return err + } g.db.Store(db) - return g + return nil +} + +func (g *group) isInit() bool { + return g.GetSchema() != nil } func (g *group) GetSchema() *commonv1.Group { @@ -366,46 +425,15 @@ func (g *group) SupplyTSDB() tsdb.Database { return g.db.Load().(tsdb.Database) } -func (g *group) setDB(db tsdb.Database) { - g.db.Store(db) -} - -func (g *group) StoreResource(ctx context.Context, resourceSchema ResourceSchema) (Resource, error) { +func (g *group) storeResource(ctx context.Context, resourceSchema ResourceSchema) (Resource, error) { g.mapMutex.Lock() defer g.mapMutex.Unlock() - key := resourceSchema.GetMetadata().GetName() - preResource := g.schemaMap[key] - var localCtx context.Context - var cancel context.CancelFunc - if preResource != nil && - resourceSchema.GetMetadata().GetModRevision() <= preResource.GetMetadata().GetModRevision() { - // we only need to check the max modifications revision observed for index rules - localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) - idxRules, errIndexRules := g.metadata.IndexRules(localCtx, resourceSchema.GetMetadata()) - cancel() - if errIndexRules != nil { - return nil, errIndexRules - } - localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) - topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(localCtx, resourceSchema.GetMetadata()) - cancel() - if errTopN != nil { - return nil, errTopN - } - if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) { - maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs)))) - if preResource.MaxObservedModRevision() >= maxModRevision { - return preResource, nil - } - } - } - localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + localCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() idxRules, err := g.metadata.IndexRules(localCtx, resourceSchema.GetMetadata()) if err != nil { return nil, err } - var topNAggrs []*databasev1.TopNAggregation if _, ok := resourceSchema.(*databasev1.Measure); ok { localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) @@ -416,20 +444,28 @@ func (g *group) StoreResource(ctx context.Context, resourceSchema ResourceSchema return nil, innerErr } } - - sm, errTS := g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, ResourceSpec{ - Schema: resourceSchema, - IndexRules: idxRules, - Aggregations: topNAggrs, - }) - if errTS != nil { - return nil, errTS + resource := &resourceSpec{ + schema: resourceSchema, + indexRules: idxRules, + aggregations: topNAggrs, + } + key := resourceSchema.GetMetadata().GetName() + preResource := g.schemaMap[key] + if preResource != nil && preResource.isNewThan(resource) { + return preResource, nil + } + if !g.isPortable() { + sm, errTS := g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, resource) + if errTS != nil { + return nil, errTS + } + resource.delegated = sm } - g.schemaMap[key] = sm + g.schemaMap[key] = resource if preResource != nil { _ = preResource.Close() } - return sm, nil + return resource, nil } func (g *group) deleteResource(metadata *commonv1.Metadata) error { @@ -445,6 +481,10 @@ func (g *group) deleteResource(metadata *commonv1.Metadata) error { return nil } +func (g *group) isPortable() bool { + return g.resourceSupplier == nil +} + func (g *group) LoadResource(name string) (Resource, bool) { g.mapMutex.RLock() s := g.schemaMap[name] @@ -461,11 +501,13 @@ func (g *group) close() (err error) { err = multierr.Append(err, s.Close()) } g.mapMutex.RUnlock() + if !g.isInit() { + return nil + } return multierr.Append(err, g.SupplyTSDB().Close()) } -// ParseMaxModRevision gives the max revision from resources' metadata. -func ParseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) { +func parseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) { maxRevisionForIdxRules = int64(0) for _, idxRule := range indexRules { if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules { diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go new file mode 100644 index 00000000..f19a4230 --- /dev/null +++ b/pkg/schema/schema.go @@ -0,0 +1,96 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package schema implements a framework to sync schema info from the metadata repository. +package schema + +import ( + "io" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/tsdb" +) + +// EventType defines actions of events. +type EventType uint8 + +// EventType support Add/Update and Delete. +// All events are idempotent. +const ( + EventAddOrUpdate EventType = iota + EventDelete +) + +// EventKind defines category of events. +type EventKind uint8 + +// This framework groups events to a hierarchy. A group is the root node. +const ( + EventKindGroup EventKind = iota + EventKindResource +) + +// Group is the root node, allowing get resources from its sub nodes. +type Group interface { + GetSchema() *commonv1.Group + LoadResource(name string) (Resource, bool) +} + +// MetadataEvent is the syncing message between metadata repo and this framework. +type MetadataEvent struct { + Metadata *commonv1.Metadata + Typ EventType + Kind EventKind +} + +// ResourceSchema allows get the metadata. +type ResourceSchema interface { + GetMetadata() *commonv1.Metadata +} + +// Resource allows access metadata from a local cache. +type Resource interface { + io.Closer + IndexRules() []*databasev1.IndexRule + TopN() []*databasev1.TopNAggregation + Schema() ResourceSchema + Delegated() io.Closer +} + +// ResourceSchemaSupplier allows get a ResourceSchema from the metadata. +type ResourceSchemaSupplier interface { + ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error) +} + +// ResourceSupplier allows open a resource and its embedded tsdb. +type ResourceSupplier interface { + ResourceSchemaSupplier + OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource) (io.Closer, error) + OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) +} + +// Repository is the collection of several hierarchies groups by a "Group". +type Repository interface { + Watcher() + SendMetadataEvent(MetadataEvent) + StoreGroup(groupMeta *commonv1.Metadata) (*group, error) + LoadGroup(name string) (Group, bool) + LoadResource(metadata *commonv1.Metadata) (Resource, bool) + Close() + StopCh() <-chan struct{} +}