This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a63ae2a3 Improve groupBy, orderBy and multi-group queries (#904)
a63ae2a3 is described below
commit a63ae2a378a462b05bc7bc422c2b4ac2635ecd6c
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Dec 30 10:34:09 2025 +0800
Improve groupBy, orderBy and multi-group queries (#904)
---
banyand/measure/block.go | 6 +++---
banyand/measure/query_test.go | 10 ++++++----
banyand/measure/tstable_test.go | 10 ++++++++++
banyand/stream/block.go | 4 ++--
banyand/stream/query.go | 15 +++++++++------
banyand/stream/query_by_idx.go | 1 +
banyand/trace/block.go | 2 +-
banyand/trace/query.go | 10 ++++------
banyand/trace/query_test.go | 8 ++++++--
banyand/trace/tstable_test.go | 6 ++++++
pkg/query/logical/measure/measure_analyzer.go | 6 +-----
pkg/query/logical/measure/measure_plan_groupby.go | 3 +++
pkg/query/logical/measure/schema.go | 17 +++++++++++++----
pkg/query/logical/plan.go | 15 ++++-----------
pkg/query/logical/schema.go | 23 ++++++++++++-----------
pkg/query/logical/trace/schema.go | 13 +++++++++++++
16 files changed, 94 insertions(+), 55 deletions(-)
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 2523ee71..59f50241 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -549,7 +549,7 @@ OUTER:
for i := range cf.columns {
if cf.columns[i].name == tagName {
schemaType, hasSchemaType :=
bc.schemaTagTypes[tagName]
- if !hasSchemaType ||
cf.columns[i].valueType == schemaType {
+ if hasSchemaType &&
cf.columns[i].valueType == schemaType {
for _, v := range
cf.columns[i].values[idx:offset] {
t.Values =
append(t.Values, mustDecodeTagValue(cf.columns[i].valueType, v))
}
@@ -638,7 +638,7 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult,
storedIndexValue map[commo
for _, c := range cf.columns {
if c.name == tagName {
schemaType, hasSchemaType :=
bc.schemaTagTypes[tagName]
- if !hasSchemaType || c.valueType ==
schemaType {
+ if hasSchemaType && c.valueType ==
schemaType {
r.TagFamilies[i].Tags[j].Values
= append(r.TagFamilies[i].Tags[j].Values, mustDecodeTagValue(c.valueType,
c.values[bc.idx]))
} else {
r.TagFamilies[i].Tags[j].Values
= append(r.TagFamilies[i].Tags[j].Values, pbv1.NullTagValue)
@@ -694,7 +694,7 @@ func (bc *blockCursor) replace(r *model.MeasureResult,
storedIndexValue map[comm
for _, c := range cf.columns {
if c.name == tagName {
schemaType, hasSchemaType :=
bc.schemaTagTypes[tagName]
- if !hasSchemaType || c.valueType ==
schemaType {
+ if hasSchemaType && c.valueType ==
schemaType {
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
mustDecodeTagValue(c.valueType, c.values[bc.idx])
} else {
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
pbv1.NullTagValue
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 17ca9c37..18851e8d 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -1238,8 +1238,9 @@ func TestQueryResult(t *testing.T) {
verify := func(t *testing.T, tst *tsTable) {
defer tst.Close()
queryOpts := queryOptions{
- minTimestamp: tt.minTimestamp,
- maxTimestamp: tt.maxTimestamp,
+ schemaTagTypes: testSchemaTagTypes,
+ minTimestamp: tt.minTimestamp,
+ maxTimestamp: tt.maxTimestamp,
}
s := tst.currentSnapshot()
require.NotNil(t, s)
@@ -1467,8 +1468,9 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
}
defer tst.Close()
queryOpts := queryOptions{
- minTimestamp: tt.minTimestamp,
- maxTimestamp: tt.maxTimestamp,
+ schemaTagTypes: testSchemaTagTypes,
+ minTimestamp: tt.minTimestamp,
+ maxTimestamp: tt.maxTimestamp,
}
queryOpts.TagProjection = tagProjections[1]
queryOpts.FieldProjection = fieldProjections[1]
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index e6ae63d8..afb44b91 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -282,6 +282,16 @@ var allTagProjections = []model.TagProjection{
{Family: "singleTag", Names: []string{"strTag", "intTag", "strTag1",
"strTag2"}},
}
+var testSchemaTagTypes = map[string]pbv1.ValueType{
+ "strArrTag": pbv1.ValueTypeStrArr,
+ "intArrTag": pbv1.ValueTypeInt64Arr,
+ "binaryTag": pbv1.ValueTypeBinaryData,
+ "strTag": pbv1.ValueTypeStr,
+ "intTag": pbv1.ValueTypeInt64,
+ "strTag1": pbv1.ValueTypeStr,
+ "strTag2": pbv1.ValueTypeStr,
+}
+
var fieldProjections = map[int][]string{
1: {"strField", "intField", "floatField", "binaryField"},
3: {"intField"},
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 24484800..f167c603 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -491,7 +491,7 @@ func (bc *blockCursor) copyAllTo(r *model.StreamResult,
desc bool) {
schemaType, hasSchemaType := bc.schemaTagTypes[c.name]
for k := start; k < end; k++ {
if len(c.values) > k {
- if !hasSchemaType || c.valueType ==
schemaType {
+ if hasSchemaType && c.valueType ==
schemaType {
values[k-start] =
mustDecodeTagValue(c.valueType, c.values[k])
} else {
values[k-start] =
pbv1.NullTagValue
@@ -536,7 +536,7 @@ func (bc *blockCursor) copyTo(r *model.StreamResult) {
for i2, c := range cf.tags {
schemaType, hasSchemaType := bc.schemaTagTypes[c.name]
if len(c.values) > bc.idx {
- if !hasSchemaType || c.valueType == schemaType {
+ if hasSchemaType && c.valueType == schemaType {
r.TagFamilies[i].Tags[i2].Values =
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType,
c.values[bc.idx]))
} else {
r.TagFamilies[i].Tags[i2].Values =
append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index b42e2c2b..1fd65bad 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -69,11 +69,11 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
series := prepareSeriesData(sqo)
schemaTagTypes := make(map[string]pbv1.ValueType)
- if is := s.indexSchema.Load(); is != nil {
- for name, spec := range is.(indexSchema).tagMap {
- vt := pbv1.TagValueSpecToValueType(spec.GetType())
+ for _, tf := range s.schema.GetTagFamilies() {
+ for _, tag := range tf.GetTags() {
+ vt := pbv1.TagValueSpecToValueType(tag.GetType())
if vt != pbv1.ValueTypeUnknown {
- schemaTagTypes[name] = vt
+ schemaTagTypes[tag.GetName()] = vt
}
}
}
@@ -85,7 +85,7 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil
}
- return s.executeIndexedQuery(ctx, segments, series, sqo, &tr)
+ return s.executeIndexedQuery(ctx, segments, series, sqo,
schemaTagTypes, &tr)
}
func validateQueryInput(sqo model.StreamQueryOptions) error {
@@ -165,9 +165,10 @@ func (s *stream) executeIndexedQuery(
segments []storage.Segment[*tsTable, option],
series []*pbv1.Series,
sqo model.StreamQueryOptions,
+ schemaTagTypes map[string]pbv1.ValueType,
tr *index.RangeOpts,
) (model.StreamQueryResult, error) {
- result, seriesFilter, resultTS, err :=
s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, tr)
+ result, seriesFilter, resultTS, err :=
s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, schemaTagTypes, tr)
if err != nil {
return nil, err
}
@@ -205,6 +206,7 @@ func (s *stream) processSegmentsAndBuildFilters(
segments []storage.Segment[*tsTable, option],
series []*pbv1.Series,
sqo model.StreamQueryOptions,
+ schemaTagTypes map[string]pbv1.ValueType,
tr *index.RangeOpts,
) (idxResult, posting.List, posting.List, error) {
var result idxResult
@@ -213,6 +215,7 @@ func (s *stream) processSegmentsAndBuildFilters(
result.sm = s
result.qo = queryOptions{
StreamQueryOptions: sqo,
+ schemaTagTypes: schemaTagTypes,
seriesToEntity:
make(map[common.SeriesID][]*modelv1.TagValue),
}
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index 0baadbad..df43d660 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -191,6 +191,7 @@ func (qr *idxResult) loadSortingData(ctx context.Context)
*model.StreamResult {
qo.StreamQueryOptions = qr.qo.StreamQueryOptions
qo.elementFilter = roaring.NewPostingList()
qo.seriesToEntity = qr.qo.seriesToEntity
+ qo.schemaTagTypes = qr.qo.schemaTagTypes
qr.elementIDsSorted = qr.elementIDsSorted[:0]
count, searchedSize := 1, 0
tracer := query.GetTracer(ctx)
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 25c9a53c..b4e05733 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -468,7 +468,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) {
schemaType, hasSchemaType := bc.schemaTagTypes[t.name]
for k := range bc.spans {
if len(t.values) > k {
- if !hasSchemaType || t.valueType == schemaType {
+ if hasSchemaType && t.valueType == schemaType {
values[k] =
mustDecodeTagValue(t.valueType, t.values[k])
} else {
values[k] = pbv1.NullTagValue
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index a675f1f0..5fdbb6d5 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -84,12 +84,10 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
sort.Strings(tqo.TraceIDs)
schemaTagTypes := make(map[string]pbv1.ValueType)
- if is := t.indexSchema.Load(); is != nil {
- for name, spec := range is.(indexSchema).tagMap {
- vt := pbv1.TagValueSpecToValueType(spec.GetType())
- if vt != pbv1.ValueTypeUnknown {
- schemaTagTypes[name] = vt
- }
+ for _, tag := range t.schema.GetTags() {
+ vt := pbv1.TagValueSpecToValueType(tag.GetType())
+ if vt != pbv1.ValueTypeUnknown {
+ schemaTagTypes[tag.GetName()] = vt
}
}
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
index bff0798d..f318d508 100644
--- a/banyand/trace/query_test.go
+++ b/banyand/trace/query_test.go
@@ -91,7 +91,9 @@ func TestQueryResult(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
verify := func(t *testing.T, tst *tsTable) {
defer tst.Close()
- queryOpts := queryOptions{}
+ queryOpts := queryOptions{
+ schemaTagTypes: testSchemaTagTypes,
+ }
s := tst.currentSnapshot()
require.NotNil(t, s)
defer s.decRef()
@@ -338,7 +340,9 @@ func TestQueryResultMultipleBatches(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
verify := func(t *testing.T, tst *tsTable) {
defer tst.Close()
- queryOpts := queryOptions{}
+ queryOpts := queryOptions{
+ schemaTagTypes: testSchemaTagTypes,
+ }
s := tst.currentSnapshot()
require.NotNil(t, s)
defer s.decRef()
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 73d072b3..d6a3cdf1 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -229,6 +229,12 @@ var allTagProjections = &model.TagProjection{
Names: []string{"strArrTag", "strTag", "intTag"},
}
+var testSchemaTagTypes = map[string]pbv1.ValueType{
+ "strArrTag": pbv1.ValueTypeStrArr,
+ "strTag": pbv1.ValueTypeStr,
+ "intTag": pbv1.ValueTypeInt64,
+}
+
var tsTS1 = &traces{
traceIDs: []string{"trace1", "trace2", "trace3"},
timestamps: []int64{1, 1, 1},
diff --git a/pkg/query/logical/measure/measure_analyzer.go
b/pkg/query/logical/measure/measure_analyzer.go
index 62a25b26..8932006c 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -101,11 +101,7 @@ func Analyze(criteria *measurev1.QueryRequest, metadata
[]*commonv1.Metadata, ss
}
fieldProjection := criteria.GetFieldProjection().GetNames()
if len(fieldProjection) > 0 {
- projFields := make([]*logical.Field, len(fieldProjection))
- for i, fieldName := range fieldProjection {
- projFields[i] = logical.NewField(fieldName)
- }
- if err := ms.ValidateProjectionFields(projFields...); err !=
nil {
+ if err := ms.ValidateProjectionFields(fieldProjection...); err
!= nil {
return nil, err
}
}
diff --git a/pkg/query/logical/measure/measure_plan_groupby.go
b/pkg/query/logical/measure/measure_plan_groupby.go
index 06ee92ef..6584fd71 100644
--- a/pkg/query/logical/measure/measure_plan_groupby.go
+++ b/pkg/query/logical/measure/measure_plan_groupby.go
@@ -64,6 +64,9 @@ func (gba *unresolvedGroup) Analyze(measureSchema
logical.Schema) (logical.Plan,
if err != nil {
return nil, err
}
+ if len(groupByTagRefs) == 0 {
+ return nil, errors.Wrap(logical.ErrTagNotDefined, "groupBy
schema")
+ }
return &groupBy{
Parent: &logical.Parent{
UnresolvedInput: gba.unresolvedInput,
diff --git a/pkg/query/logical/measure/schema.go
b/pkg/query/logical/measure/schema.go
index badc9ceb..3bdc81ca 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -67,10 +67,10 @@ func (m *schema) CreateFieldRef(fields ...*logical.Field)
([]*logical.FieldRef,
}
// ValidateProjectionFields checks if all fields in the projection exist in
the schema.
-func (m *schema) ValidateProjectionFields(fields ...*logical.Field) error {
+func (m *schema) ValidateProjectionFields(fields ...string) error {
for _, field := range fields {
- if _, ok := m.fieldMap[field.Name]; !ok {
- return errors.Errorf("field %s not found in schema",
field.Name)
+ if _, ok := m.fieldMap[field]; !ok {
+ return errors.Errorf("field %s not found in schema",
field)
}
}
return nil
@@ -154,7 +154,16 @@ func mergeSchema(schemas []logical.Schema)
(logical.Schema, error) {
for name, spec := range mSchema.fieldMap {
if existing, exists := fieldMap[name]; exists {
if existing.Spec.FieldType !=
spec.Spec.FieldType {
- existing.Spec.FieldType =
databasev1.FieldType_FIELD_TYPE_UNSPECIFIED
+ // Create a copy to avoid modifying the
original schema.
+ fieldMap[name] = &logical.FieldSpec{
+ FieldIdx: existing.FieldIdx,
+ Spec: &databasev1.FieldSpec{
+ Name:
existing.Spec.Name,
+ FieldType:
databasev1.FieldType_FIELD_TYPE_UNSPECIFIED,
+ EncodingMethod:
existing.Spec.EncodingMethod,
+ CompressionMethod:
existing.Spec.CompressionMethod,
+ },
+ }
}
} else {
fieldMap[name] = &logical.FieldSpec{
diff --git a/pkg/query/logical/plan.go b/pkg/query/logical/plan.go
index 35821d0a..d9c9147a 100644
--- a/pkg/query/logical/plan.go
+++ b/pkg/query/logical/plan.go
@@ -34,9 +34,8 @@ type Parent struct {
// OrderBy is the sorting operator.
type OrderBy struct {
- Index *databasev1.IndexRule
- fieldRefs []*TagRef
- Sort modelv1.Sort
+ Index *databasev1.IndexRule
+ Sort modelv1.Sort
}
// Equal reports whether o and other has the same sorting order and name.
@@ -79,14 +78,8 @@ func ParseOrderBy(s Schema, indexRuleName string, sort
modelv1.Sort) (*OrderBy,
return nil, errors.Wrap(errIndexSortingUnsupported,
indexRuleName)
}
- projFieldSpecs, err := s.CreateTagRef(NewTags("",
indexRule.GetTags()...))
- if err != nil {
- return nil, errors.Wrap(ErrTagNotDefined, indexRuleName)
- }
-
return &OrderBy{
- Sort: sort,
- Index: indexRule,
- fieldRefs: projFieldSpecs[0],
+ Sort: sort,
+ Index: indexRule,
}, nil
}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 51a59d04..d9ae7dcc 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -213,13 +213,6 @@ func MergeSchemas(schemas []*CommonSchema) (*CommonSchema,
error) {
indexRuleMap := make(map[string]*databasev1.IndexRule)
for _, s := range schemas {
- for tagName, tagSpec := range s.TagSpecMap {
- if existingSpec, exists := merged.TagSpecMap[tagName];
!exists {
- merged.TagSpecMap[tagName] = tagSpec
- } else if existingSpec.Spec.Type != tagSpec.Spec.Type {
- existingSpec.Spec.Type =
databasev1.TagType_TAG_TYPE_UNSPECIFIED
- }
- }
for _, rule := range s.IndexRules {
if existedRule := indexRuleMap[rule.Metadata.Name];
existedRule == nil {
indexRuleMap[rule.Metadata.Name] = rule
@@ -235,7 +228,8 @@ func MergeSchemas(schemas []*CommonSchema) (*CommonSchema,
error) {
return merged, nil
}
-func mergeTagSpecs(dst, src []*databasev1.TagSpec) []*databasev1.TagSpec {
+// MergeTagSpecs merges two slices of TagSpec.
+func MergeTagSpecs(dst, src []*databasev1.TagSpec) []*databasev1.TagSpec {
res := make([]*databasev1.TagSpec, 0, len(dst)+len(src))
res = append(res, dst...)
for _, s := range src {
@@ -244,8 +238,11 @@ func mergeTagSpecs(dst, src []*databasev1.TagSpec)
[]*databasev1.TagSpec {
if d.Name == s.Name {
if d.Type != s.Type {
// If the type is different, the tag
spec is not compatible.
- // We need to set the type to
unspecifed.
- res[i].Type =
databasev1.TagType_TAG_TYPE_UNSPECIFIED
+ // Create a copy with type set to
unspecified to avoid modifying the original schema.
+ res[i] = &databasev1.TagSpec{
+ Name: d.Name,
+ Type:
databasev1.TagType_TAG_TYPE_UNSPECIFIED,
+ }
}
found = true
break
@@ -266,7 +263,11 @@ func MergeTagFamilySpecs(dst []*databasev1.TagFamilySpec,
src []*databasev1.TagF
found := false
for i, d := range dst {
if d.Name == s.Name {
- res[i].Tags = mergeTagSpecs(d.Tags, s.Tags)
+ // Create a copy to avoid modifying the
original schema.
+ res[i] = &databasev1.TagFamilySpec{
+ Name: d.Name,
+ Tags: MergeTagSpecs(d.Tags, s.Tags),
+ }
found = true
break
}
diff --git a/pkg/query/logical/trace/schema.go
b/pkg/query/logical/trace/schema.go
index 59c2d54e..b0475764 100644
--- a/pkg/query/logical/trace/schema.go
+++ b/pkg/query/logical/trace/schema.go
@@ -121,6 +121,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema,
error) {
return schemas[0], nil
}
var commonSchemas []*logical.CommonSchema
+ var allTags []*databasev1.TagSpec
for _, sm := range schemas {
if sm == nil {
continue
@@ -129,6 +130,15 @@ func mergeSchema(schemas []logical.Schema)
(logical.Schema, error) {
if s == nil {
continue
}
+ traceTags := s.trace.GetTags()
+ tagSpecs := make([]*databasev1.TagSpec, len(traceTags))
+ for i, tt := range traceTags {
+ tagSpecs[i] = &databasev1.TagSpec{
+ Name: tt.GetName(),
+ Type: tt.GetType(),
+ }
+ }
+ allTags = logical.MergeTagSpecs(allTags, tagSpecs)
commonSchemas = append(commonSchemas, s.common)
}
merged, err := logical.MergeSchemas(commonSchemas)
@@ -139,5 +149,8 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema,
error) {
common: merged,
children: schemas,
}
+ for i, tag := range allTags {
+ ret.common.RegisterTag(0, i, tag)
+ }
return ret, nil
}