This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a0448131 Fix top-n high cardinality (#569)
a0448131 is described below
commit a044813184065a5bdb264ea2d453c70f4aac5711
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Dec 11 08:22:46 2024 +0800
Fix top-n high cardinality (#569)
---
CHANGES.md | 2 +
banyand/measure/topn.go | 419 ++++++++++++++++-------
banyand/measure/topn_test.go | 91 +++++
banyand/measure/write.go | 2 +-
banyand/query/processor_topn.go | 22 +-
pkg/partition/index.go | 3 +
pkg/pb/v1/series.go | 7 +-
pkg/pb/v1/value.go | 33 ++
pkg/query/logical/measure/topn_analyzer.go | 132 +++----
pkg/query/logical/measure/topn_plan_localscan.go | 275 +++++++++------
pkg/schema/cache.go | 7 +-
pkg/schema/init.go | 178 +++-------
pkg/schema/schema.go | 3 +
test/cases/topn/data/want/aggr_desc.yaml | 5 +-
14 files changed, 747 insertions(+), 432 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index cc1ca646..58a8b021 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
- Index: Support InsertIfAbsent functionality which ensures documents are only
inserted if their docIDs are not already present in the current index. There is
a exception for the documents with extra index fields more than the entity's
index fields.
- Measure: Introduce "index_mode" to save data exclusively in the series
index, ideal for non-timeseries measures.
- Index: Use numeric index type to support Int and Float
+- TopN: Group top n pre-calculation result by the group key in the new
introduced `_top_n_result` measure, which is used to store the pre-calculation
result.
### Bug Fixes
@@ -22,6 +23,7 @@ Release Notes.
- Fix: View configuration on Property page.
- UI: Add `indexMode` to display on the measure page.
- UI: Add `NoSort` Field to IndexRule page.
+- Metadata: Fix the bug that the cache load nil value that is the unknown
index rule on the index rule binding.
### Documentation
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index c64270aa..c7649781 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -18,6 +18,7 @@
package measure
import (
+ "bytes"
"context"
"encoding/base64"
"fmt"
@@ -32,7 +33,6 @@ import (
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/timestamppb"
- "github.com/apache/skywalking-banyandb/api/common"
apiData "github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -40,13 +40,17 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
+ "github.com/apache/skywalking-banyandb/pkg/schema"
)
const (
@@ -65,18 +69,20 @@ type dataPointWithEntityValues struct {
*measurev1.DataPointValue
entityValues []*modelv1.TagValue
seriesID uint64
+ shardID uint32
}
type topNStreamingProcessor struct {
- m *measure
+ pipeline queue.Queue
streamingFlow flow.Flow
+ in chan flow.StreamRecord
l *logger.Logger
- pipeline queue.Queue
topNSchema *databasev1.TopNAggregation
src chan interface{}
- in chan flow.StreamRecord
+ m *measure
errCh <-chan error
stopCh chan struct{}
+ buf []byte
flow.ComponentState
interval time.Duration
sortDirection modelv1.Sort
@@ -137,134 +143,93 @@ func (t *topNStreamingProcessor)
writeStreamRecord(record flow.StreamRecord) err
var err error
publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
defer publisher.Close()
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
for group, tuples := range tuplesGroups {
if e := t.l.Debug(); e.Enabled() {
- e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
- Str("group", group).
- Int("rankNums", len(tuples)).
- Msg("Write tuples")
+ for i := range tuples {
+ tuple := tuples[i]
+ data :=
tuple.V2.(flow.StreamRecord).Data().(flow.Data)
+ e.
+ Int("rankNums", i+1).
+ Str("entityValues", fmt.Sprintf("%v",
data[0])).
+ Int("value", int(data[2].(int64))).
+ Time("eventTime", eventTime).
+ Msgf("Write tuples %s %s",
t.topNSchema.GetMetadata().GetName(), group)
+ }
}
- for rankNum, tuple := range tuples {
- fieldValue := tuple.V1.(int64)
+ topNValue.Reset()
+ topNValue.setMetadata(t.topNSchema.GetFieldName(),
t.m.schema.Entity.TagNames)
+ var shardID uint32
+ for _, tuple := range tuples {
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
- err = multierr.Append(err, t.writeData(publisher,
eventTime, fieldValue, data, rankNum))
+ topNValue.addValue(
+ tuple.V1.(int64),
+ data[0].([]*modelv1.TagValue),
+ )
+ shardID = data[3].(uint32)
}
- }
- return err
-}
-
-func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher,
eventTime time.Time, fieldValue int64,
- data flow.Data, rankNum int,
-) error {
- var tagValues []*modelv1.TagValue
- if len(t.topNSchema.GetGroupByTagNames()) > 0 {
- var ok bool
- if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
- return errors.New("fail to extract tag values from topN
result")
+ entityValues := []*modelv1.TagValue{
+ {
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value:
t.topNSchema.GetMetadata().GetName(),
+ },
+ },
+ },
+ {
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value: int64(t.sortDirection),
+ },
+ },
+ },
+ {
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: group,
+ },
+ },
+ },
}
- }
- for _, tv := range tagValues {
- if tv.Value == nil {
- t.l.Warn().Msg("tag value is nil")
+ t.buf = t.buf[:0]
+ if t.buf, err = topNValue.marshal(t.buf); err != nil {
+ return err
}
- }
- series, shardID, err := t.locate(tagValues, rankNum)
- if err != nil {
- return err
- }
-
- iwr := &measurev1.InternalWriteRequest{
- Request: &measurev1.WriteRequest{
- MessageId: uint64(time.Now().UnixNano()),
- Metadata: t.topNSchema.GetMetadata(),
- DataPoint: &measurev1.DataPointValue{
- Timestamp: timestamppb.New(eventTime),
- TagFamilies: []*modelv1.TagFamilyForWrite{
- {
- Tags: append(
-
data[0].([]*modelv1.TagValue),
- // SortDirection
- &modelv1.TagValue{
- Value:
&modelv1.TagValue_Int{
- Int:
&modelv1.Int{
-
Value: int64(t.sortDirection),
- },
- },
- },
- // RankNumber
- &modelv1.TagValue{
- Value:
&modelv1.TagValue_Int{
- Int:
&modelv1.Int{
-
Value: int64(rankNum),
- },
- },
- },
- ),
+ iwr := &measurev1.InternalWriteRequest{
+ Request: &measurev1.WriteRequest{
+ MessageId: uint64(time.Now().UnixNano()),
+ Metadata: &commonv1.Metadata{Name:
schema.TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp: timestamppb.New(eventTime),
+ TagFamilies:
[]*modelv1.TagFamilyForWrite{
+ {Tags: entityValues},
},
- },
- Fields: []*modelv1.FieldValue{
- {
- Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{
- Value:
fieldValue,
+ Fields: []*modelv1.FieldValue{
+ {
+ Value:
&modelv1.FieldValue_BinaryData{
+ BinaryData:
bytes.Clone(t.buf),
},
},
},
},
},
- },
- EntityValues: series.EntityValues,
- ShardId: uint32(shardID),
+ EntityValues: entityValues,
+ ShardId: shardID,
+ }
+ message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
+ _, err = publisher.Publish(context.TODO(),
apiData.TopicMeasureWrite, message)
+ if err != nil {
+ return err
+ }
}
- message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
- _, errWritePub := publisher.Publish(context.TODO(),
apiData.TopicMeasureWrite, message)
- return errWritePub
+ return err
}
func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64)
time.Time {
return time.UnixMilli(eventTimeMillis -
eventTimeMillis%t.interval.Milliseconds())
}
-func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum
int) (*pbv1.Series, common.ShardID, error) {
- if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) !=
len(tagValues) {
- return nil, 0, errors.New("no enough tag values for the entity")
- }
- series := &pbv1.Series{
- Subject: t.topNSchema.GetMetadata().GetName(),
- EntityValues: make([]*modelv1.TagValue, 1+1+len(tagValues)),
- }
-
- copy(series.EntityValues, tagValues)
- series.EntityValues[len(series.EntityValues)-2] = &modelv1.TagValue{
- Value: &modelv1.TagValue_Int{
- Int: &modelv1.Int{
- Value: int64(t.sortDirection),
- },
- },
- }
- series.EntityValues[len(series.EntityValues)-1] = &modelv1.TagValue{
- Value: &modelv1.TagValue_Int{
- Int: &modelv1.Int{
- Value: int64(rankNum),
- },
- },
- }
- if err := series.Marshal(); err != nil {
- return nil, 0, fmt.Errorf("fail to marshal series: %w", err)
- }
- src := make([]byte, len(series.Buffer))
- copy(src, series.Buffer)
- var s1 pbv1.Series
- if err := s1.Unmarshal(src); err != nil {
- return nil, 0, fmt.Errorf("fail to unmarshal series
encoded:[%s] tagValues:[%s]: %w", series.Buffer, series.EntityValues, err)
- }
- id, err := partition.ShardID(series.Buffer, t.m.shardNum)
- if err != nil {
- return nil, 0, err
- }
- return series, common.ShardID(id), nil
-}
-
func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
flushInterval := t.interval
if flushInterval > maxFlushInterval {
@@ -326,7 +291,7 @@ func (manager *topNProcessorManager) Close() error {
return err
}
-func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request
*measurev1.InternalWriteRequest) {
+func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID
uint32, request *measurev1.InternalWriteRequest) {
go func() {
manager.RLock()
defer manager.RUnlock()
@@ -336,6 +301,7 @@ func (manager *topNProcessorManager)
onMeasureWrite(seriesID uint64, request *me
request.GetRequest().GetDataPoint(),
request.GetEntityValues(),
seriesID,
+ shardID,
},
request.GetRequest().GetDataPoint().GetTimestamp())
}
}
@@ -381,6 +347,7 @@ func (manager *topNProcessorManager) start() error {
stopCh: make(chan struct{}),
streamingFlow: streamingFlow,
pipeline: manager.pipeline,
+ buf: make([]byte, 0, 64),
}
processorList[i] = processor.start()
}
@@ -439,8 +406,9 @@ func (manager *topNProcessorManager) buildMapper(fieldName
string, groupByNames
"",
// field value as v2
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
- // groupBy tag values as v3
- nil,
+ // shardID values as v3
+ dpWithEvs.shardID,
+ // seriesID values as v4
dpWithEvs.seriesID,
}
}, nil
@@ -455,15 +423,14 @@ func (manager *topNProcessorManager)
buildMapper(fieldName string, groupByNames
// EntityValues as identity
dpWithEvs.entityValues,
// save string representation of group values as the
key, i.e. v1
- strings.Join(transform(groupLocator, func(locator
partition.TagLocator) string {
- return
stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
- }), "|"),
+ GroupName(transform(groupLocator, func(locator
partition.TagLocator) string {
+ return
Stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
+ })),
// field value as v2
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
- // groupBy tag values as v3
- transform(groupLocator, func(locator
partition.TagLocator) *modelv1.TagValue {
- return
extractTagValue(dpWithEvs.DataPointValue, locator)
- }),
+ // shardID values as v3
+ dpWithEvs.shardID,
+ // seriesID values as v4
dpWithEvs.seriesID,
}
}, nil
@@ -499,7 +466,8 @@ func extractTagValue(dpv *measurev1.DataPointValue, locator
partition.TagLocator
return tagFamily.GetTags()[locator.TagOffset]
}
-func stringify(tagValue *modelv1.TagValue) string {
+// Stringify converts a TagValue to a string.
+func Stringify(tagValue *modelv1.TagValue) string {
switch v := tagValue.GetValue().(type) {
case *modelv1.TagValue_Str:
return v.Str.GetValue()
@@ -525,3 +493,208 @@ func transform[I, O any](input []I, mapper func(I) O) []O
{
}
return output
}
+
+// GenerateTopNValue returns a new TopNValue instance.
+func GenerateTopNValue() *TopNValue {
+ v := topNValuePool.Get()
+ if v == nil {
+ return &TopNValue{}
+ }
+ return v
+}
+
+// ReleaseTopNValue releases a TopNValue instance.
+func ReleaseTopNValue(bi *TopNValue) {
+ bi.Reset()
+ topNValuePool.Put(bi)
+}
+
+var topNValuePool = pool.Register[*TopNValue]("measure-topNValue")
+
+// TopNValue represents the topN value.
+type TopNValue struct {
+ valueName string
+ entityTagNames []string
+ values []int64
+ entities [][]*modelv1.TagValue
+ entityValues [][]byte
+ buf []byte
+ encodeType encoding.EncodeType
+ firstValue int64
+}
+
+func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
+ t.valueName = valueName
+ t.entityTagNames = entityTagNames
+}
+
+func (t *TopNValue) addValue(value int64, entityValues []*modelv1.TagValue) {
+ t.values = append(t.values, value)
+ t.entities = append(t.entities, entityValues)
+}
+
+// Values returns the valueName, entityTagNames, values, and entities.
+func (t *TopNValue) Values() (string, []string, []int64,
[][]*modelv1.TagValue) {
+ return t.valueName, t.entityTagNames, t.values, t.entities
+}
+
+// Reset resets the TopNValue.
+func (t *TopNValue) Reset() {
+ t.valueName = ""
+ t.entityTagNames = t.entityTagNames[:0]
+ t.values = t.values[:0]
+ for i := range t.entities {
+ t.entities[i] = t.entities[i][:0]
+ }
+ t.entities = t.entities[:0]
+ t.buf = t.buf[:0]
+ t.encodeType = encoding.EncodeTypeUnknown
+ t.firstValue = 0
+ for i := range t.entityValues {
+ t.entityValues[i] = t.entityValues[i][:0]
+ }
+ t.entityValues = t.entityValues[:0]
+}
+
+func (t *TopNValue) resizeEntityValues(size int) [][]byte {
+ entityValues := t.entityValues
+ if n := size - cap(entityValues); n > 0 {
+ entityValues = append(entityValues[:cap(entityValues)],
make([][]byte, n)...)
+ }
+ t.entityValues = entityValues[:size]
+ return t.entityValues
+}
+
+func (t *TopNValue) resizeEntities(size int, entitySize int)
[][]*modelv1.TagValue {
+ entities := t.entities
+ if n := size - cap(t.entities); n > 0 {
+ entities = append(entities[:cap(entities)],
make([][]*modelv1.TagValue, n)...)
+ }
+ t.entities = entities[:size]
+ for i := range t.entities {
+ entity := t.entities[i]
+ if n := entitySize - cap(entity); n > 0 {
+ entity = append(entity[:cap(entity)],
make([]*modelv1.TagValue, n)...)
+ }
+ t.entities[i] = entity[:0]
+ }
+ return t.entities
+}
+
+func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
+ if len(t.values) == 0 {
+ return nil, errors.New("values is empty")
+ }
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+ for _, entityTagName := range t.entityTagNames {
+ dst = encoding.EncodeBytes(dst,
convert.StringToBytes(entityTagName))
+ }
+
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.values)))
+
+ t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf,
t.values)
+ dst = append(dst, byte(t.encodeType))
+ dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+ dst = append(dst, t.buf...)
+
+ var err error
+ t.entityValues = t.resizeEntityValues(len(t.entities))
+ for i, tvv := range t.entities {
+ t.entityValues[i], err =
pbv1.MarshalTagValues(t.entityValues[i], tvv)
+ if err != nil {
+ return nil, err
+ }
+ }
+ dst = encoding.EncodeBytesBlock(dst, t.entityValues)
+ return dst, nil
+}
+
+// Unmarshal unmarshals the TopNValue from the src.
+func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder)
error {
+ var err error
+ src, nameBytes, err := encoding.DecodeBytes(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal topNValue.name: %w", err)
+ }
+ t.valueName = convert.BytesToString(nameBytes)
+
+ var entityTagNamesCount uint64
+ src, entityTagNamesCount, err = encoding.BytesToVarUint64(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal
topNValue.entityTagNamesCount: %w", err)
+ }
+ t.entityTagNames = make([]string, 0, entityTagNamesCount)
+ var entityTagNameBytes []byte
+ for i := uint64(0); i < entityTagNamesCount; i++ {
+ src, entityTagNameBytes, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal
topNValue.entityTagName: %w", err)
+ }
+ t.entityTagNames = append(t.entityTagNames,
convert.BytesToString(entityTagNameBytes))
+ }
+
+ var valuesCount uint64
+ src, valuesCount, err = encoding.BytesToVarUint64(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal topNValue.valuesCount: %w",
err)
+ }
+
+ if len(src) < 1 {
+ return fmt.Errorf("cannot unmarshal topNValue.encodeType: src
is too short")
+ }
+ t.encodeType = encoding.EncodeType(src[0])
+ src = src[1:]
+
+ if len(src) < 1 {
+ return fmt.Errorf("cannot unmarshal topNValue.firstValue: src
is too short")
+ }
+ src, t.firstValue, err = encoding.BytesToVarInt64(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal topNValue.firstValue: %w",
err)
+ }
+ if len(src) < 1 {
+ return fmt.Errorf("cannot unmarshal topNValue.valueLen: src is
too short")
+ }
+ var valueLen uint64
+ src, valueLen, err = encoding.BytesToVarUint64(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal topNValue.valueLen: %w",
err)
+ }
+
+ if uint64(len(src)) < valueLen {
+ return fmt.Errorf("src is too short for reading string with
size %d; len(src)=%d", valueLen, len(src))
+ }
+
+ t.values, err = encoding.BytesToInt64List(t.values, src[:valueLen],
t.encodeType, t.firstValue, int(valuesCount))
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal topNValue.values: %w", err)
+ }
+
+ if uint64(len(src)) < valueLen {
+ return fmt.Errorf("src is too short for reading string with
size %d; len(src)=%d", valueLen, len(src))
+ }
+
+ decoder.Reset()
+ t.entityValues, err = decoder.Decode(t.entityValues, src[valueLen:],
valuesCount)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal topNValue.entityValues:
%w", err)
+ }
+ t.resizeEntities(len(t.entityValues), int(entityTagNamesCount))
+ for i, ev := range t.entityValues {
+ t.buf, t.entities[i], err = pbv1.UnmarshalTagValues(t.buf,
t.entities[i], ev)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal
topNValue.entityValues[%d]: %w", i, err)
+ }
+ if len(t.entities[i]) != len(t.entityTagNames) {
+ return fmt.Errorf("entityValues[%d] length is not equal
to entityTagNames", i)
+ }
+ }
+ return nil
+}
+
+// GroupName returns the group name.
+func GroupName(groupTags []string) string {
+ return strings.Join(groupTags, "|")
+}
diff --git a/banyand/measure/topn_test.go b/banyand/measure/topn_test.go
new file mode 100644
index 00000000..b416ec6b
--- /dev/null
+++ b/banyand/measure/topn_test.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+func TestTopNValue_MarshalUnmarshal(t *testing.T) {
+ tests := []struct {
+ topNVal *TopNValue
+ name string
+ }{
+ {
+ name: "simple case",
+ topNVal: &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2"},
+ values: []int64{1, 2, 3},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity1"}}},
+ },
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity2"}}},
+ },
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity3"}}},
+ },
+ },
+ },
+ },
+ {
+ name: "single",
+ topNVal: &TopNValue{
+ valueName: "testValue",
+ entityTagNames: []string{"tag1", "tag2"},
+ values: []int64{1},
+ entities: [][]*modelv1.TagValue{
+ {
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+ {Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity1"}}},
+ },
+ },
+ },
+ },
+ }
+ decoder := generateColumnValuesDecoder()
+ defer releaseColumnValuesDecoder(decoder)
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Marshal the topNValue
+ dst, err := tt.topNVal.marshal(nil)
+ require.NoError(t, err)
+
+ // Unmarshal the topNValue
+ var decodedTopNValue TopNValue
+ err = decodedTopNValue.Unmarshal(dst, decoder)
+ require.NoError(t, err)
+
+ // Compare the original and decoded topNValue
+ require.Equal(t, tt.topNVal.valueName,
decodedTopNValue.valueName)
+ require.Equal(t, tt.topNVal.entityTagNames,
decodedTopNValue.entityTagNames)
+ require.Equal(t, tt.topNVal.values,
decodedTopNValue.values)
+ require.Equal(t, tt.topNVal.entities,
decodedTopNValue.entities)
+ })
+ }
+}
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b1e9810b..859cf09a 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -145,7 +145,7 @@ func (w *writeCallback) handle(dst
map[string]*dataPointsInGroup, writeEvent *me
dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
if stm.processorManager != nil {
- stm.processorManager.onMeasureWrite(uint64(series.ID),
&measurev1.InternalWriteRequest{
+ stm.processorManager.onMeasureWrite(uint64(series.ID),
uint32(shardID), &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: stm.GetSchema().Metadata,
DataPoint: req.DataPoint,
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e042df76..234230f2 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -40,6 +40,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/aggregation"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_measure
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
+ pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
)
type topNQueryProcessor struct {
@@ -94,21 +95,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
return
}
- topNResultMeasure, err := t.measureService.Measure(topNMetadata)
- if err != nil {
- t.log.Error().Err(err).
- Str("topN", topNMetadata.GetName()).
- Msg("fail to find topN result measure")
- return
- }
-
- s, err := logical_measure.BuildTopNSchema(topNResultMeasure.GetSchema())
- if err != nil {
- t.log.Error().Err(err).
- Str("topN", topNMetadata.GetName()).
- Msg("fail to build schema")
- }
- plan, err := logical_measure.TopNAnalyze(ctx, request,
topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s)
+ plan, err := logical_measure.TopNAnalyze(request,
sourceMeasure.GetSchema(), topNSchema)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for topn %s: %v", topNMetadata.GetName(), err))
return
@@ -117,6 +104,11 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("topn plan")
}
+ topNResultMeasure, err :=
t.measureService.Measure(pkgschema.GetTopNSchemaMetadata(topNMetadata.Group))
+ if err != nil {
+ ml.Error().Err(err).Str("topN",
topNMetadata.GetName()).Msg("fail to find topn result measure")
+ return
+ }
var tracer *query.Tracer
var span *query.Span
if request.Trace {
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index bbb4daf0..f3c69833 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -49,6 +49,9 @@ func ParseIndexRuleLocators(entity *databasev1.Entity,
families []*databasev1.Ta
}
findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule {
for i := range indexRules {
+ if indexRules[i] == nil {
+ continue
+ }
for j := range indexRules[i].Tags {
if indexRules[i].Tags[j] == tagName {
return indexRules[i]
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index 542343d9..31a6f289 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -48,10 +48,9 @@ func (s *Series) CopyTo(dst *Series) {
func (s *Series) Marshal() error {
s.Buffer = marshalEntityValue(s.Buffer,
convert.StringToBytes(s.Subject))
var err error
- for _, tv := range s.EntityValues {
- if s.Buffer, err = marshalTagValue(s.Buffer, tv); err != nil {
- return errors.WithMessage(err, "marshal subject and
entity values")
- }
+ s.Buffer, err = MarshalTagValues(s.Buffer, s.EntityValues)
+ if err != nil {
+ return errors.WithMessage(err, "marshal subject and entity
values")
}
s.ID = common.SeriesID(convert.Hash(s.Buffer))
return nil
diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go
index 1e73e6c7..b9c6fa80 100644
--- a/pkg/pb/v1/value.go
+++ b/pkg/pb/v1/value.go
@@ -97,6 +97,33 @@ func MustTagValueToStr(tag *modelv1.TagValue) string {
}
}
+// MarshalTagValues marshals tag values.
+func MarshalTagValues(dest []byte, tags []*modelv1.TagValue) ([]byte, error) {
+ var err error
+ for _, tag := range tags {
+ dest, err = marshalTagValue(dest, tag)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return dest, nil
+}
+
+// UnmarshalTagValues unmarshals tag values.
+func UnmarshalTagValues(dest []byte, destTags []*modelv1.TagValue, src []byte)
([]byte, []*modelv1.TagValue, error) {
+ var err error
+ var tag *modelv1.TagValue
+ for len(src) > 0 {
+ dest = dest[:0]
+ dest, src, tag, err = unmarshalTagValue(dest, src)
+ if err != nil {
+ return nil, nil, err
+ }
+ destTags = append(destTags, tag)
+ }
+ return dest, destTags, nil
+}
+
func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) {
dest = append(dest, byte(MustTagValueToValueType(tv)))
switch tv.Value.(type) {
@@ -136,6 +163,9 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte,
[]byte, *modelv1.TagVal
if dest, src, err = unmarshalEntityValue(dest, src[1:]); err !=
nil {
return nil, nil, nil, errors.WithMessage(err,
"unmarshal string tag value")
}
+ if len(dest) == 0 {
+ return dest, src, NullTagValue, nil
+ }
return dest, src, &modelv1.TagValue{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
@@ -158,6 +188,9 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte,
[]byte, *modelv1.TagVal
if dest, src, err = unmarshalEntityValue(dest, src[1:]); err !=
nil {
return nil, nil, nil, errors.WithMessage(err,
"unmarshal binary tag value")
}
+ if len(dest) == 0 {
+ return dest, src, NullTagValue, nil
+ }
data := make([]byte, len(dest))
copy(data, dest)
return dest, src, &modelv1.TagValue{
diff --git a/pkg/query/logical/measure/topn_analyzer.go
b/pkg/query/logical/measure/topn_analyzer.go
index 4d72787f..eed24da5 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -19,69 +19,49 @@
package measure
import (
- "context"
- "fmt"
+ "errors"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
+ pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
)
-// BuildTopNSchema returns Schema loaded from the metadata repository.
-func BuildTopNSchema(md *databasev1.Measure) (logical.Schema, error) {
- md.GetEntity()
-
- ms := &schema{
- common: &logical.CommonSchema{
- TagSpecMap: make(map[string]*logical.TagSpec),
- EntityList: md.GetEntity().GetTagNames(),
- },
- measure: md,
- fieldMap: make(map[string]*logical.FieldSpec),
- }
-
- ms.common.RegisterTagFamilies(md.GetTagFamilies())
-
- for fieldIdx, spec := range md.GetFields() {
- ms.registerField(fieldIdx, spec)
- }
-
- return ms, nil
-}
-
// TopNAnalyze converts logical expressions to executable operation tree
represented by Plan.
-func TopNAnalyze(_ context.Context, criteria *measurev1.TopNRequest, schema
*databasev1.Measure,
- sourceMeasureSchema *databasev1.Measure, s logical.Schema,
+func TopNAnalyze(criteria *measurev1.TopNRequest, sourceMeasureSchema
*databasev1.Measure, topNAggSchema *databasev1.TopNAggregation,
) (logical.Plan, error) {
- groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames()
- groupByTags := make([][]*logical.Tag, len(schema.GetTagFamilies()))
- tagFamily := schema.GetTagFamilies()[0]
- groupByTags[0] = logical.NewTags(tagFamily.GetName(),
groupByProjectionTags...)
-
- if len(schema.GetFields()) != 1 {
- return nil, fmt.Errorf("topN schema fields count should be 1
but got %d", len(schema.GetFields()))
- }
- projectionFields := make([]*logical.Field, 1)
- fieldName := schema.GetFields()[0].GetName()
- projectionFields[0] = logical.NewField(fieldName)
// parse fields
- plan := parse(criteria, schema.GetMetadata(), projectionFields,
groupByTags)
+ timeRange := criteria.GetTimeRange()
+ var plan logical.UnresolvedPlan
+ plan = &unresolvedLocalScan{
+ name: criteria.Name,
+ startTime: timeRange.GetBegin().AsTime(),
+ endTime: timeRange.GetEnd().AsTime(),
+ conditions: criteria.Conditions,
+ sort: criteria.FieldValueSort,
+ groupByTags: topNAggSchema.GroupByTagNames,
+ }
+ s, err := buildVirtualSchema(sourceMeasureSchema,
topNAggSchema.FieldName)
+ if err != nil {
+ return nil, err
+ }
if criteria.GetAgg() != 0 {
+ groupByProjectionTags :=
sourceMeasureSchema.GetEntity().GetTagNames()
+ groupByTags :=
[][]*logical.Tag{logical.NewTags(pkgschema.TopNTagFamily,
groupByProjectionTags...)}
plan = newUnresolvedGroupBy(plan, groupByTags, false)
plan = newUnresolvedAggregation(plan,
- projectionFields[0],
+ &logical.Field{Name: topNAggSchema.FieldName},
criteria.GetAgg(),
true)
}
plan = top(plan, &measurev1.QueryRequest_Top{
Number: criteria.GetTopN(),
- FieldName: fieldName,
+ FieldName: topNAggSchema.FieldName,
FieldValueSort: criteria.GetFieldValueSort(),
})
+
p, err := plan.Analyze(s)
if err != nil {
return nil, err
@@ -89,26 +69,54 @@ func TopNAnalyze(_ context.Context, criteria
*measurev1.TopNRequest, schema *dat
return p, nil
}
-func parse(criteria *measurev1.TopNRequest, metadata *commonv1.Metadata,
- projFields []*logical.Field, projTags [][]*logical.Tag,
-) logical.UnresolvedPlan {
- timeRange := criteria.GetTimeRange()
- return local(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(),
- metadata, projTags, projFields, buildConditions(criteria),
criteria.GetFieldValueSort())
-}
-
-func buildConditions(criteria *measurev1.TopNRequest) []*modelv1.Condition {
- return append([]*modelv1.Condition{
- {
- Name: "sortDirection",
- Op: modelv1.Condition_BINARY_OP_EQ,
- Value: &modelv1.TagValue{
- Value: &modelv1.TagValue_Int{
- Int: &modelv1.Int{
- Value:
int64(criteria.GetFieldValueSort().Number()),
- },
- },
+func buildVirtualSchema(sourceMeasureSchema *databasev1.Measure, fieldName
string) (logical.Schema, error) {
+ var tags []*databasev1.TagSpec
+ for _, tag := range sourceMeasureSchema.GetEntity().TagNames {
+ for i := range sourceMeasureSchema.GetTagFamilies() {
+ for j := range
sourceMeasureSchema.GetTagFamilies()[i].Tags {
+ if
sourceMeasureSchema.GetTagFamilies()[i].Tags[j].Name == tag {
+ tags = append(tags,
sourceMeasureSchema.GetTagFamilies()[i].Tags[j])
+ if len(tags) ==
len(sourceMeasureSchema.GetEntity().TagNames) {
+ break
+ }
+ }
+ }
+ }
+ }
+ if len(tags) != len(sourceMeasureSchema.GetEntity().TagNames) {
+ return nil, errors.New("failed to build topn schema, source
measure schema is invalid:" + sourceMeasureSchema.String())
+ }
+ var fields []*databasev1.FieldSpec
+ for _, field := range sourceMeasureSchema.GetFields() {
+ if field.GetName() == fieldName {
+ fields = append(fields, field)
+ break
+ }
+ }
+ md := &databasev1.Measure{
+ Metadata: sourceMeasureSchema.Metadata,
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: pkgschema.TopNTagFamily,
+ Tags: tags,
},
},
- }, criteria.GetConditions()...)
+ Fields: fields,
+ Entity: &databasev1.Entity{
+ TagNames: sourceMeasureSchema.Entity.TagNames,
+ },
+ }
+ ms := &schema{
+ common: &logical.CommonSchema{
+ TagSpecMap: make(map[string]*logical.TagSpec),
+ EntityList: md.GetEntity().GetTagNames(),
+ },
+ measure: md,
+ fieldMap: make(map[string]*logical.FieldSpec),
+ }
+ ms.common.RegisterTagFamilies(md.GetTagFamilies())
+ for fieldIdx, spec := range md.GetFields() {
+ ms.registerField(fieldIdx, spec)
+ }
+ return ms, nil
}
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go
b/pkg/query/logical/measure/topn_plan_localscan.go
index 9608af95..ee319af6 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -24,95 +24,105 @@ import (
"time"
"github.com/pkg/errors"
+ "google.golang.org/protobuf/types/known/timestamppb"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- "github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/query/model"
+ pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-var _ logical.UnresolvedPlan = (*unresolvedLocalScan)(nil)
+var (
+ _ logical.UnresolvedPlan = (*unresolvedLocalScan)(nil)
+ fieldProjection =
[]string{pkgschema.TopNFieldName}
+)
type unresolvedLocalScan struct {
- startTime time.Time
- endTime time.Time
- metadata *commonv1.Metadata
- conditions []*modelv1.Condition
- projectionTags [][]*logical.Tag
- projectionFields []*logical.Field
- sort modelv1.Sort
+ startTime time.Time
+ endTime time.Time
+ name string
+ conditions []*modelv1.Condition
+ groupByTags []string
+ sort modelv1.Sort
}
func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan,
error) {
- var projTagsRefs [][]*logical.TagRef
- projTags := make([]model.TagProjection, len(uls.projectionTags))
- if len(uls.projectionTags) > 0 {
- for i := range uls.projectionTags {
- for _, tag := range uls.projectionTags[i] {
- projTags[i].Family = tag.GetFamilyName()
- projTags[i].Names = append(projTags[i].Names,
tag.GetTagName())
- }
- }
- var err error
- projTagsRefs, err = s.CreateTagRef(uls.projectionTags...)
- if err != nil {
- return nil, err
- }
+ tr := timestamp.NewInclusiveTimeRange(uls.startTime, uls.endTime)
+ groupByTags, err := uls.parseGroupByTags()
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to locate entity")
}
-
- var projFieldRefs []*logical.FieldRef
- var projField []string
- if len(uls.projectionFields) > 0 {
- for i := range uls.projectionFields {
- projField = append(projField,
uls.projectionFields[i].Name)
- }
- var err error
- projFieldRefs, err = s.CreateFieldRef(uls.projectionFields...)
- if err != nil {
- return nil, err
- }
+ entities := [][]*modelv1.TagValue{
+ {
+ {
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: uls.name,
+ },
+ },
+ },
+ {
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value: int64(uls.sort),
+ },
+ },
+ },
+ pbv1.AnyTagValue,
+ },
}
-
- entity, err := uls.locateEntity(s.EntityList())
- if err != nil {
- return nil, err
+ if len(groupByTags) > 0 {
+ entities[0][len(entities[0])-1] = &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: measure.GroupName(groupByTags),
+ },
+ },
+ }
}
-
return &localScan{
- timeRange:
timestamp.NewInclusiveTimeRange(uls.startTime, uls.endTime),
- schema: s,
- projectionTagsRefs: projTagsRefs,
- projectionFieldsRefs: projFieldRefs,
- projectionTags: projTags,
- projectionFields: projField,
- metadata: uls.metadata,
- entity: entity,
- l: logger.GetLogger("topn", "measure",
uls.metadata.Group, uls.metadata.Name, "local-index"),
+ s: s,
+ options: model.MeasureQueryOptions{
+ Name: pkgschema.TopNSchemaName,
+ TimeRange: &tr,
+ Entities: entities,
+ TagProjection: []model.TagProjection{
+ {
+ Family: pkgschema.TopNTagFamily,
+ Names: pkgschema.TopNTagNames,
+ },
+ },
+ FieldProjection: fieldProjection,
+ },
}, nil
}
-func (uls *unresolvedLocalScan) locateEntity(entityList []string)
([]*modelv1.TagValue, error) {
+func (uls *unresolvedLocalScan) parseGroupByTags() ([]string, error) {
+ if len(uls.conditions) == 0 {
+ return nil, nil
+ }
entityMap := make(map[string]int)
- entity := make([]*modelv1.TagValue, len(entityList))
- for idx, tagName := range entityList {
+ entity := make([]string, len(uls.groupByTags))
+ for idx, tagName := range uls.groupByTags {
entityMap[tagName] = idx
- // allow to make fuzzy search with partial conditions
- entity[idx] = pbv1.AnyTagValue
}
+ parsed := 0
for _, pairQuery := range uls.conditions {
if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
return nil, errors.Errorf("tag belongs to the entity
only supports EQ operation in condition(%v)", pairQuery)
}
if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
- entity[entityIdx] = pairQuery.Value
switch pairQuery.GetValue().GetValue().(type) {
case *modelv1.TagValue_Str, *modelv1.TagValue_Int,
*modelv1.TagValue_Null:
- entity[entityIdx] = pairQuery.Value
+ entity[entityIdx] =
measure.Stringify(pairQuery.Value)
+ parsed++
default:
return nil, errors.New("unsupported condition
tag type for entity")
}
@@ -120,61 +130,37 @@ func (uls *unresolvedLocalScan) locateEntity(entityList
[]string) ([]*modelv1.Ta
}
return nil, errors.New("only groupBy tag name is supported")
}
+ if parsed != len(uls.groupByTags) {
+ return nil, errors.New("failed to parse all groupBy tags")
+ }
return entity, nil
}
-func local(startTime, endTime time.Time, metadata *commonv1.Metadata,
projectionTags [][]*logical.Tag,
- projectionFields []*logical.Field, conditions []*modelv1.Condition,
sort modelv1.Sort,
-) logical.UnresolvedPlan {
- return &unresolvedLocalScan{
- startTime: startTime,
- endTime: endTime,
- metadata: metadata,
- projectionTags: projectionTags,
- projectionFields: projectionFields,
- conditions: conditions,
- sort: sort,
- }
-}
-
var _ logical.Plan = (*localScan)(nil)
type localScan struct {
- schema logical.Schema
- metadata *commonv1.Metadata
- l *logger.Logger
- timeRange timestamp.TimeRange
- projectionTagsRefs [][]*logical.TagRef
- projectionFieldsRefs []*logical.FieldRef
- projectionTags []model.TagProjection
- projectionFields []string
- entity []*modelv1.TagValue
- sort modelv1.Sort
+ s logical.Schema
+ options model.MeasureQueryOptions
}
func (i *localScan) Execute(ctx context.Context) (mit executor.MIterator, err
error) {
ec := executor.FromMeasureExecutionContext(ctx)
- result, err := ec.Query(ctx, model.MeasureQueryOptions{
- Name: i.metadata.GetName(),
- TimeRange: &i.timeRange,
- Entities: [][]*modelv1.TagValue{i.entity},
- Order: &index.OrderBy{Sort: i.sort},
- TagProjection: i.projectionTags,
- FieldProjection: i.projectionFields,
- })
+ result, err := ec.Query(ctx, i.options)
if err != nil {
return nil, fmt.Errorf("failed to query measure: %w", err)
}
- return &resultMIterator{
- result: result,
+ return &topNMIterator{
+ result: result,
+ topNValue: measure.GenerateTopNValue(),
+ decoder: generateTopNValuesDecoder(),
}, nil
}
func (i *localScan) String() string {
- return fmt.Sprintf("IndexScan:
startTime=%d,endTime=%d,Metadata{group=%s,name=%s}; projection=%s; sort=%s;",
- i.timeRange.Start.Unix(), i.timeRange.End.Unix(),
i.metadata.GetGroup(), i.metadata.GetName(),
- logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.sort)
+ return fmt.Sprintf("TopNAggScan: %s startTime=%d,endTime=%d,
entity=%s;",
+ i.options.Name, i.options.TimeRange.Start.Unix(),
i.options.TimeRange.End.Unix(),
+ i.options.Entities)
}
func (i *localScan) Children() []logical.Plan {
@@ -182,8 +168,101 @@ func (i *localScan) Children() []logical.Plan {
}
func (i *localScan) Schema() logical.Schema {
- if len(i.projectionTagsRefs) == 0 {
- return i.schema
+ return i.s
+}
+
+type topNMIterator struct {
+ result model.MeasureQueryResult
+ err error
+ topNValue *measure.TopNValue
+ decoder *encoding.BytesBlockDecoder
+ current []*measurev1.DataPoint
+}
+
+func (ei *topNMIterator) Next() bool {
+ if ei.result == nil {
+ return false
}
- return
i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
+
+ r := ei.result.Pull()
+ if r == nil {
+ return false
+ }
+ if r.Error != nil {
+ ei.err = r.Error
+ return false
+ }
+ ei.current = ei.current[:0]
+
+ for i := range r.Timestamps {
+ fv := r.Fields[0].Values[i]
+ bd := fv.GetBinaryData()
+ if bd == nil {
+ ei.err = errors.New("failed to get binary data")
+ return false
+ }
+ ei.topNValue.Reset()
+ ei.err = ei.topNValue.Unmarshal(bd, ei.decoder)
+ if ei.err != nil {
+ return false
+ }
+ fieldName, entityNames, values, entities :=
ei.topNValue.Values()
+ for j := range entities {
+ dp := &measurev1.DataPoint{
+ Timestamp: timestamppb.New(time.Unix(0,
r.Timestamps[i])),
+ Sid: uint64(r.SID),
+ Version: r.Versions[i],
+ }
+ tagFamily := &modelv1.TagFamily{
+ Name: pkgschema.TopNTagFamily,
+ }
+ dp.TagFamilies = append(dp.TagFamilies, tagFamily)
+ for k, entityName := range entityNames {
+ tagFamily.Tags = append(tagFamily.Tags,
&modelv1.Tag{
+ Key: entityName,
+ Value: entities[j][k],
+ })
+ }
+ dp.Fields = append(dp.Fields,
&measurev1.DataPoint_Field{
+ Name: fieldName,
+ Value: &modelv1.FieldValue{
+ Value: &modelv1.FieldValue_Int{
+ Int: &modelv1.Int{
+ Value: values[j],
+ },
+ },
+ },
+ })
+ ei.current = append(ei.current, dp)
+ }
+ }
+ return true
}
+
+func (ei *topNMIterator) Current() []*measurev1.DataPoint {
+ return ei.current
+}
+
+func (ei *topNMIterator) Close() error {
+ if ei.result != nil {
+ ei.result.Release()
+ }
+ releaseTopNValuesDecoder(ei.decoder)
+ measure.ReleaseTopNValue(ei.topNValue)
+ return ei.err
+}
+
+func generateTopNValuesDecoder() *encoding.BytesBlockDecoder {
+ v := topNValuesDecoderPool.Get()
+ if v == nil {
+ return &encoding.BytesBlockDecoder{}
+ }
+ return v
+}
+
+func releaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
+ d.Reset()
+ topNValuesDecoderPool.Put(d)
+}
+
+var topNValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder")
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 9abe21ec..7fcd31f4 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -203,7 +203,7 @@ func (sr *schemaRepo) Watcher() {
err =
sr.initResource(evt.Metadata.GetMetadata())
case EventKindTopNAgg:
topNSchema :=
evt.Metadata.(*databasev1.TopNAggregation)
- _, err =
createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(),
topNSchema)
+ err =
createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(),
topNSchema.GetMetadata().Group)
if err != nil {
break
}
@@ -217,10 +217,7 @@ func (sr *schemaRepo) Watcher() {
err =
sr.deleteResource(evt.Metadata.GetMetadata())
case EventKindTopNAgg:
topNSchema :=
evt.Metadata.(*databasev1.TopNAggregation)
- err = multierr.Combine(
-
sr.deleteResource(topNSchema.SourceMeasure),
-
sr.initResource(topNSchema.SourceMeasure),
- )
+ err =
sr.initResource(topNSchema.SourceMeasure)
}
}
if err != nil && !errors.Is(err,
schema.ErrClosed) {
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index a158daab..5050e4bc 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -22,9 +22,7 @@ import (
"fmt"
"time"
- "github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
- "google.golang.org/protobuf/testing/protocmp"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -32,16 +30,23 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-const topNTagFamily = "__topN__"
+const (
+ // TopNTagFamily is the tag family name of the topN result measure.
+ TopNTagFamily = "_topN"
+ // TopNFieldName is the field name of the topN result measure.
+ TopNFieldName = "value"
+)
var (
- initTimeout = 10 * time.Second
- topNValueFieldSpec = &databasev1.FieldSpec{
- Name: "value",
- FieldType: databasev1.FieldType_FIELD_TYPE_INT,
+ initTimeout = 10 * time.Second
+ topNFieldsSpec = []*databasev1.FieldSpec{{
+ Name: TopNFieldName,
+ FieldType: databasev1.FieldType_FIELD_TYPE_DATA_BINARY,
EncodingMethod:
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
- }
+ }}
+ // TopNTagNames is the tag names of the topN result measure.
+ TopNTagNames = []string{"name", "direction", "group"}
)
type revisionContext struct {
@@ -154,6 +159,12 @@ func (sr *schemaRepo) getRules(ctx context.Context, gName
string) map[string]*da
func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group
*group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) {
aggMap := sr.getAggMap(ctx, gName)
+ if len(aggMap) > 0 {
+ if err := createTopNResultMeasure(ctx,
sr.metadata.MeasureRegistry(), gName); err != nil {
+ logger.Panicf("fails to create the topN result measure:
%v", err)
+ return
+ }
+ }
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()
@@ -163,30 +174,8 @@ func (sr *schemaRepo) processMeasure(ctx context.Context,
gName string, group *g
logger.Panicf("fails to get the measures: %v", err)
return
}
- revCtx := ctx.Value(revCtxKey).(*revisionContext)
- measureMap := make(map[string]*databasev1.Measure, len(mm))
- for _, m := range mm {
- measureMap[m.Metadata.Name] = m
- }
- for _, aa := range aggMap {
- for _, a := range aa {
- sourceMeasre, ok := measureMap[a.SourceMeasure.Name]
- if !ok {
- sr.l.Warn().Str("group", gName).Str("measure",
a.SourceMeasure.Name).Str("agg", a.Metadata.Name).Msg("source measure not
found")
- continue
- }
- if _, ok := measureMap[a.Metadata.Name]; !ok {
- sr.l.Info().Str("group", gName).Str("measure",
a.SourceMeasure.Name).Str("agg", a.Metadata.Name).Msg("remove topN aggregation")
- m, err := createTopNMeasure(ctx,
sr.metadata.MeasureRegistry(), a, sourceMeasre)
- if err != nil {
- logger.Panicf("fails to create or
update the topN measure: %v", err)
- return
- }
- mm = append(mm, m)
- }
- }
- }
+ revCtx := ctx.Value(revCtxKey).(*revisionContext)
for _, m := range mm {
if m.Metadata.ModRevision > revCtx.measure {
revCtx.measure = m.Metadata.ModRevision
@@ -228,7 +217,9 @@ func (sr *schemaRepo) storeMeasure(m *databasev1.Measure,
group *group, bindings
var indexRules []*databasev1.IndexRule
if rr, ok := bindings[m.Metadata.GetName()]; ok {
for _, r := range rr {
- indexRules = append(indexRules, rules[r])
+ if rule, ok := rules[r]; ok {
+ indexRules = append(indexRules, rule)
+ }
}
}
var topNAggr []*databasev1.TopNAggregation
@@ -263,7 +254,9 @@ func (sr *schemaRepo) storeStream(s *databasev1.Stream,
group *group, bindings m
var indexRules []*databasev1.IndexRule
if rr, ok := bindings[s.Metadata.GetName()]; ok {
for _, r := range rr {
- indexRules = append(indexRules, rules[r])
+ if rule, ok := rules[r]; ok {
+ indexRules = append(indexRules, rule)
+ }
}
}
if err := sr.storeResource(group, s, indexRules, nil); err != nil {
@@ -284,107 +277,50 @@ func (sr *schemaRepo) initGroup(groupSchema
*commonv1.Group) (*group, error) {
return g, nil
}
-func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry
schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure,
error) {
- oldTopNSchema, err := measureSchemaRegistry.GetMeasure(ctx,
topNSchema.GetMetadata())
+func createTopNResultMeasure(ctx context.Context, measureSchemaRegistry
schema.Measure, group string) error {
+ md := GetTopNSchemaMetadata(group)
+ m, err := measureSchemaRegistry.GetMeasure(ctx, md)
if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
- return nil, errors.WithMessagef(err, "fail to get current topN
measure %s", topNSchema.GetMetadata().GetName())
+ return errors.WithMessagef(err, "fail to get %s", md)
}
-
- sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(ctx,
topNSchema.GetSourceMeasure())
- if err != nil {
- return nil, errors.WithMessagef(err, "fail to get source
measure %s", topNSchema.GetSourceMeasure().GetName())
+ if m != nil {
+ return nil
}
- // create a new "derived" measure for TopN result
- newTopNMeasure, err := buildTopNSourceMeasure(topNSchema,
sourceMeasureSchema)
- if err != nil {
- return nil, err
- }
- if oldTopNSchema == nil {
- if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx,
newTopNMeasure); innerErr != nil {
- if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
- return nil, errors.WithMessagef(innerErr, "fail
to create new topN measure %s", newTopNMeasure.GetMetadata().GetName())
- }
- newTopNMeasure, err =
measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata())
- if err != nil {
- return nil, errors.WithMessagef(err, "fail to
get created topN measure %s", topNSchema.GetMetadata().GetName())
- }
+ m = GetTopNSchema(md)
+ if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr
!= nil {
+ if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
+ return errors.WithMessagef(innerErr, "fail to create
new topN measure %s", m)
}
- return newTopNMeasure, nil
- }
- // compare with the old one
- if cmp.Diff(newTopNMeasure, oldTopNSchema,
- protocmp.IgnoreUnknown(),
- protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
- protocmp.IgnoreFields(&commonv1.Metadata{}, "id",
"create_revision", "mod_revision"),
- protocmp.Transform()) == "" {
- return oldTopNSchema, nil
- }
- // update
- if _, err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure);
err != nil {
- return nil, errors.WithMessagef(err, "fail to update topN
measure %s", newTopNMeasure.GetMetadata().GetName())
- }
- return newTopNMeasure, nil
-}
-
-func createTopNMeasure(ctx context.Context, measureSchemaRegistry
schema.Measure, topNSchema *databasev1.TopNAggregation,
- sourceMeasureSchema *databasev1.Measure,
-) (*databasev1.Measure, error) {
- newTopNMeasure, err := buildTopNSourceMeasure(topNSchema,
sourceMeasureSchema)
- if err != nil {
- return nil, err
- }
- if _, err := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure);
err != nil {
- return nil, err
}
- return newTopNMeasure, nil
+ return nil
}
-func buildTopNSourceMeasure(topNSchema *databasev1.TopNAggregation,
sourceMeasureSchema *databasev1.Measure) (*databasev1.Measure, error) {
- tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
- seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
-
- for _, tagName := range tagNames {
- var found bool
- for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
- for _, tSpec := range fSpec.GetTags() {
- if tSpec.GetName() == tagName {
- seriesSpecs = append(seriesSpecs, tSpec)
- found = true
- goto CHECK
- }
- }
- }
-
- CHECK:
- if !found {
- return nil, fmt.Errorf("fail to find tag spec %s",
tagName)
- }
- }
- // create a new "derived" measure for TopN result
+// GetTopNSchema returns the schema of the topN result measure.
+func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure {
return &databasev1.Measure{
- Metadata: topNSchema.Metadata,
- Interval: sourceMeasureSchema.GetInterval(),
+ Metadata: md,
TagFamilies: []*databasev1.TagFamilySpec{
{
- Name: topNTagFamily,
- Tags: append(seriesSpecs,
- &databasev1.TagSpec{
- Name: "sortDirection",
- Type:
databasev1.TagType_TAG_TYPE_INT,
- },
- &databasev1.TagSpec{
- Name: "rankNumber",
- Type:
databasev1.TagType_TAG_TYPE_INT,
- },
- ),
+ Name: TopNTagFamily,
+ Tags: []*databasev1.TagSpec{
+ {Name: TopNTagNames[0], Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: TopNTagNames[1], Type:
databasev1.TagType_TAG_TYPE_INT},
+ {Name: TopNTagNames[2], Type:
databasev1.TagType_TAG_TYPE_STRING},
+ },
},
},
- Fields: []*databasev1.FieldSpec{topNValueFieldSpec},
+ Fields: topNFieldsSpec,
Entity: &databasev1.Entity{
- TagNames: append(topNSchema.GetGroupByTagNames(),
- "sortDirection",
- "rankNumber"),
+ TagNames: TopNTagNames,
},
- }, nil
+ }
+}
+
+// GetTopNSchemaMetadata returns the metadata of the topN result measure.
+func GetTopNSchemaMetadata(group string) *commonv1.Metadata {
+ return &commonv1.Metadata{
+ Name: TopNSchemaName,
+ Group: group,
+ }
}
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index 2ac35232..6ee3c2d0 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -26,6 +26,9 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
+// TopNSchemaName is the name of the top n result schema.
+const TopNSchemaName = "_top_n_result"
+
// EventType defines actions of events.
type EventType uint8
diff --git a/test/cases/topn/data/want/aggr_desc.yaml
b/test/cases/topn/data/want/aggr_desc.yaml
index b39f77e7..27895621 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -44,12 +44,11 @@ lists:
- entity:
- key: service_id
value:
- str:
- value: ""
+ "null": null
- key: entity_id
value:
str:
value: entity_2
value:
int:
- value: "10"
\ No newline at end of file
+ value: "10"