This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a63ae2a3 Improve groupBy, orderBy and multi-group queries (#904)
a63ae2a3 is described below

commit a63ae2a378a462b05bc7bc422c2b4ac2635ecd6c
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Dec 30 10:34:09 2025 +0800

    Improve groupBy, orderBy and multi-group queries (#904)
---
 banyand/measure/block.go                          |  6 +++---
 banyand/measure/query_test.go                     | 10 ++++++----
 banyand/measure/tstable_test.go                   | 10 ++++++++++
 banyand/stream/block.go                           |  4 ++--
 banyand/stream/query.go                           | 15 +++++++++------
 banyand/stream/query_by_idx.go                    |  1 +
 banyand/trace/block.go                            |  2 +-
 banyand/trace/query.go                            | 10 ++++------
 banyand/trace/query_test.go                       |  8 ++++++--
 banyand/trace/tstable_test.go                     |  6 ++++++
 pkg/query/logical/measure/measure_analyzer.go     |  6 +-----
 pkg/query/logical/measure/measure_plan_groupby.go |  3 +++
 pkg/query/logical/measure/schema.go               | 17 +++++++++++++----
 pkg/query/logical/plan.go                         | 15 ++++-----------
 pkg/query/logical/schema.go                       | 23 ++++++++++++-----------
 pkg/query/logical/trace/schema.go                 | 13 +++++++++++++
 16 files changed, 94 insertions(+), 55 deletions(-)

diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 2523ee71..59f50241 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -549,7 +549,7 @@ OUTER:
                        for i := range cf.columns {
                                if cf.columns[i].name == tagName {
                                        schemaType, hasSchemaType := 
bc.schemaTagTypes[tagName]
-                                       if !hasSchemaType || 
cf.columns[i].valueType == schemaType {
+                                       if hasSchemaType && 
cf.columns[i].valueType == schemaType {
                                                for _, v := range 
cf.columns[i].values[idx:offset] {
                                                        t.Values = 
append(t.Values, mustDecodeTagValue(cf.columns[i].valueType, v))
                                                }
@@ -638,7 +638,7 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, 
storedIndexValue map[commo
                        for _, c := range cf.columns {
                                if c.name == tagName {
                                        schemaType, hasSchemaType := 
bc.schemaTagTypes[tagName]
-                                       if !hasSchemaType || c.valueType == 
schemaType {
+                                       if hasSchemaType && c.valueType == 
schemaType {
                                                r.TagFamilies[i].Tags[j].Values 
= append(r.TagFamilies[i].Tags[j].Values, mustDecodeTagValue(c.valueType, 
c.values[bc.idx]))
                                        } else {
                                                r.TagFamilies[i].Tags[j].Values 
= append(r.TagFamilies[i].Tags[j].Values, pbv1.NullTagValue)
@@ -694,7 +694,7 @@ func (bc *blockCursor) replace(r *model.MeasureResult, 
storedIndexValue map[comm
                        for _, c := range cf.columns {
                                if c.name == tagName {
                                        schemaType, hasSchemaType := 
bc.schemaTagTypes[tagName]
-                                       if !hasSchemaType || c.valueType == 
schemaType {
+                                       if hasSchemaType && c.valueType == 
schemaType {
                                                
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
mustDecodeTagValue(c.valueType, c.values[bc.idx])
                                        } else {
                                                
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
pbv1.NullTagValue
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 17ca9c37..18851e8d 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -1238,8 +1238,9 @@ func TestQueryResult(t *testing.T) {
                        verify := func(t *testing.T, tst *tsTable) {
                                defer tst.Close()
                                queryOpts := queryOptions{
-                                       minTimestamp: tt.minTimestamp,
-                                       maxTimestamp: tt.maxTimestamp,
+                                       schemaTagTypes: testSchemaTagTypes,
+                                       minTimestamp:   tt.minTimestamp,
+                                       maxTimestamp:   tt.maxTimestamp,
                                }
                                s := tst.currentSnapshot()
                                require.NotNil(t, s)
@@ -1467,8 +1468,9 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
                        }
                        defer tst.Close()
                        queryOpts := queryOptions{
-                               minTimestamp: tt.minTimestamp,
-                               maxTimestamp: tt.maxTimestamp,
+                               schemaTagTypes: testSchemaTagTypes,
+                               minTimestamp:   tt.minTimestamp,
+                               maxTimestamp:   tt.maxTimestamp,
                        }
                        queryOpts.TagProjection = tagProjections[1]
                        queryOpts.FieldProjection = fieldProjections[1]
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index e6ae63d8..afb44b91 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -282,6 +282,16 @@ var allTagProjections = []model.TagProjection{
        {Family: "singleTag", Names: []string{"strTag", "intTag", "strTag1", 
"strTag2"}},
 }
 
+var testSchemaTagTypes = map[string]pbv1.ValueType{
+       "strArrTag": pbv1.ValueTypeStrArr,
+       "intArrTag": pbv1.ValueTypeInt64Arr,
+       "binaryTag": pbv1.ValueTypeBinaryData,
+       "strTag":    pbv1.ValueTypeStr,
+       "intTag":    pbv1.ValueTypeInt64,
+       "strTag1":   pbv1.ValueTypeStr,
+       "strTag2":   pbv1.ValueTypeStr,
+}
+
 var fieldProjections = map[int][]string{
        1: {"strField", "intField", "floatField", "binaryField"},
        3: {"intField"},
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 24484800..f167c603 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -491,7 +491,7 @@ func (bc *blockCursor) copyAllTo(r *model.StreamResult, 
desc bool) {
                        schemaType, hasSchemaType := bc.schemaTagTypes[c.name]
                        for k := start; k < end; k++ {
                                if len(c.values) > k {
-                                       if !hasSchemaType || c.valueType == 
schemaType {
+                                       if hasSchemaType && c.valueType == 
schemaType {
                                                values[k-start] = 
mustDecodeTagValue(c.valueType, c.values[k])
                                        } else {
                                                values[k-start] = 
pbv1.NullTagValue
@@ -536,7 +536,7 @@ func (bc *blockCursor) copyTo(r *model.StreamResult) {
                for i2, c := range cf.tags {
                        schemaType, hasSchemaType := bc.schemaTagTypes[c.name]
                        if len(c.values) > bc.idx {
-                               if !hasSchemaType || c.valueType == schemaType {
+                               if hasSchemaType && c.valueType == schemaType {
                                        r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType, 
c.values[bc.idx]))
                                } else {
                                        r.TagFamilies[i].Tags[i2].Values = 
append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index b42e2c2b..1fd65bad 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -69,11 +69,11 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
        series := prepareSeriesData(sqo)
 
        schemaTagTypes := make(map[string]pbv1.ValueType)
-       if is := s.indexSchema.Load(); is != nil {
-               for name, spec := range is.(indexSchema).tagMap {
-                       vt := pbv1.TagValueSpecToValueType(spec.GetType())
+       for _, tf := range s.schema.GetTagFamilies() {
+               for _, tag := range tf.GetTags() {
+                       vt := pbv1.TagValueSpecToValueType(tag.GetType())
                        if vt != pbv1.ValueTypeUnknown {
-                               schemaTagTypes[name] = vt
+                               schemaTagTypes[tag.GetName()] = vt
                        }
                }
        }
@@ -85,7 +85,7 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil
        }
 
-       return s.executeIndexedQuery(ctx, segments, series, sqo, &tr)
+       return s.executeIndexedQuery(ctx, segments, series, sqo, 
schemaTagTypes, &tr)
 }
 
 func validateQueryInput(sqo model.StreamQueryOptions) error {
@@ -165,9 +165,10 @@ func (s *stream) executeIndexedQuery(
        segments []storage.Segment[*tsTable, option],
        series []*pbv1.Series,
        sqo model.StreamQueryOptions,
+       schemaTagTypes map[string]pbv1.ValueType,
        tr *index.RangeOpts,
 ) (model.StreamQueryResult, error) {
-       result, seriesFilter, resultTS, err := 
s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, tr)
+       result, seriesFilter, resultTS, err := 
s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, schemaTagTypes, tr)
        if err != nil {
                return nil, err
        }
@@ -205,6 +206,7 @@ func (s *stream) processSegmentsAndBuildFilters(
        segments []storage.Segment[*tsTable, option],
        series []*pbv1.Series,
        sqo model.StreamQueryOptions,
+       schemaTagTypes map[string]pbv1.ValueType,
        tr *index.RangeOpts,
 ) (idxResult, posting.List, posting.List, error) {
        var result idxResult
@@ -213,6 +215,7 @@ func (s *stream) processSegmentsAndBuildFilters(
        result.sm = s
        result.qo = queryOptions{
                StreamQueryOptions: sqo,
+               schemaTagTypes:     schemaTagTypes,
                seriesToEntity:     
make(map[common.SeriesID][]*modelv1.TagValue),
        }
 
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index 0baadbad..df43d660 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -191,6 +191,7 @@ func (qr *idxResult) loadSortingData(ctx context.Context) 
*model.StreamResult {
        qo.StreamQueryOptions = qr.qo.StreamQueryOptions
        qo.elementFilter = roaring.NewPostingList()
        qo.seriesToEntity = qr.qo.seriesToEntity
+       qo.schemaTagTypes = qr.qo.schemaTagTypes
        qr.elementIDsSorted = qr.elementIDsSorted[:0]
        count, searchedSize := 1, 0
        tracer := query.GetTracer(ctx)
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 25c9a53c..b4e05733 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -468,7 +468,7 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) {
                schemaType, hasSchemaType := bc.schemaTagTypes[t.name]
                for k := range bc.spans {
                        if len(t.values) > k {
-                               if !hasSchemaType || t.valueType == schemaType {
+                               if hasSchemaType && t.valueType == schemaType {
                                        values[k] = 
mustDecodeTagValue(t.valueType, t.values[k])
                                } else {
                                        values[k] = pbv1.NullTagValue
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index a675f1f0..5fdbb6d5 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -84,12 +84,10 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
        sort.Strings(tqo.TraceIDs)
 
        schemaTagTypes := make(map[string]pbv1.ValueType)
-       if is := t.indexSchema.Load(); is != nil {
-               for name, spec := range is.(indexSchema).tagMap {
-                       vt := pbv1.TagValueSpecToValueType(spec.GetType())
-                       if vt != pbv1.ValueTypeUnknown {
-                               schemaTagTypes[name] = vt
-                       }
+       for _, tag := range t.schema.GetTags() {
+               vt := pbv1.TagValueSpecToValueType(tag.GetType())
+               if vt != pbv1.ValueTypeUnknown {
+                       schemaTagTypes[tag.GetName()] = vt
                }
        }
 
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
index bff0798d..f318d508 100644
--- a/banyand/trace/query_test.go
+++ b/banyand/trace/query_test.go
@@ -91,7 +91,9 @@ func TestQueryResult(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        verify := func(t *testing.T, tst *tsTable) {
                                defer tst.Close()
-                               queryOpts := queryOptions{}
+                               queryOpts := queryOptions{
+                                       schemaTagTypes: testSchemaTagTypes,
+                               }
                                s := tst.currentSnapshot()
                                require.NotNil(t, s)
                                defer s.decRef()
@@ -338,7 +340,9 @@ func TestQueryResultMultipleBatches(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        verify := func(t *testing.T, tst *tsTable) {
                                defer tst.Close()
-                               queryOpts := queryOptions{}
+                               queryOpts := queryOptions{
+                                       schemaTagTypes: testSchemaTagTypes,
+                               }
                                s := tst.currentSnapshot()
                                require.NotNil(t, s)
                                defer s.decRef()
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 73d072b3..d6a3cdf1 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -229,6 +229,12 @@ var allTagProjections = &model.TagProjection{
        Names: []string{"strArrTag", "strTag", "intTag"},
 }
 
+var testSchemaTagTypes = map[string]pbv1.ValueType{
+       "strArrTag": pbv1.ValueTypeStrArr,
+       "strTag":    pbv1.ValueTypeStr,
+       "intTag":    pbv1.ValueTypeInt64,
+}
+
 var tsTS1 = &traces{
        traceIDs:   []string{"trace1", "trace2", "trace3"},
        timestamps: []int64{1, 1, 1},
diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index 62a25b26..8932006c 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -101,11 +101,7 @@ func Analyze(criteria *measurev1.QueryRequest, metadata 
[]*commonv1.Metadata, ss
        }
        fieldProjection := criteria.GetFieldProjection().GetNames()
        if len(fieldProjection) > 0 {
-               projFields := make([]*logical.Field, len(fieldProjection))
-               for i, fieldName := range fieldProjection {
-                       projFields[i] = logical.NewField(fieldName)
-               }
-               if err := ms.ValidateProjectionFields(projFields...); err != 
nil {
+               if err := ms.ValidateProjectionFields(fieldProjection...); err 
!= nil {
                        return nil, err
                }
        }
diff --git a/pkg/query/logical/measure/measure_plan_groupby.go 
b/pkg/query/logical/measure/measure_plan_groupby.go
index 06ee92ef..6584fd71 100644
--- a/pkg/query/logical/measure/measure_plan_groupby.go
+++ b/pkg/query/logical/measure/measure_plan_groupby.go
@@ -64,6 +64,9 @@ func (gba *unresolvedGroup) Analyze(measureSchema 
logical.Schema) (logical.Plan,
        if err != nil {
                return nil, err
        }
+       if len(groupByTagRefs) == 0 {
+               return nil, errors.Wrap(logical.ErrTagNotDefined, "groupBy 
schema")
+       }
        return &groupBy{
                Parent: &logical.Parent{
                        UnresolvedInput: gba.unresolvedInput,
diff --git a/pkg/query/logical/measure/schema.go 
b/pkg/query/logical/measure/schema.go
index badc9ceb..3bdc81ca 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -67,10 +67,10 @@ func (m *schema) CreateFieldRef(fields ...*logical.Field) 
([]*logical.FieldRef,
 }
 
 // ValidateProjectionFields checks if all fields in the projection exist in 
the schema.
-func (m *schema) ValidateProjectionFields(fields ...*logical.Field) error {
+func (m *schema) ValidateProjectionFields(fields ...string) error {
        for _, field := range fields {
-               if _, ok := m.fieldMap[field.Name]; !ok {
-                       return errors.Errorf("field %s not found in schema", 
field.Name)
+               if _, ok := m.fieldMap[field]; !ok {
+                       return errors.Errorf("field %s not found in schema", 
field)
                }
        }
        return nil
@@ -154,7 +154,16 @@ func mergeSchema(schemas []logical.Schema) 
(logical.Schema, error) {
                for name, spec := range mSchema.fieldMap {
                        if existing, exists := fieldMap[name]; exists {
                                if existing.Spec.FieldType != 
spec.Spec.FieldType {
-                                       existing.Spec.FieldType = 
databasev1.FieldType_FIELD_TYPE_UNSPECIFIED
+                                       // Create a copy to avoid modifying the 
original schema.
+                                       fieldMap[name] = &logical.FieldSpec{
+                                               FieldIdx: existing.FieldIdx,
+                                               Spec: &databasev1.FieldSpec{
+                                                       Name:              
existing.Spec.Name,
+                                                       FieldType:         
databasev1.FieldType_FIELD_TYPE_UNSPECIFIED,
+                                                       EncodingMethod:    
existing.Spec.EncodingMethod,
+                                                       CompressionMethod: 
existing.Spec.CompressionMethod,
+                                               },
+                                       }
                                }
                        } else {
                                fieldMap[name] = &logical.FieldSpec{
diff --git a/pkg/query/logical/plan.go b/pkg/query/logical/plan.go
index 35821d0a..d9c9147a 100644
--- a/pkg/query/logical/plan.go
+++ b/pkg/query/logical/plan.go
@@ -34,9 +34,8 @@ type Parent struct {
 
 // OrderBy is the sorting operator.
 type OrderBy struct {
-       Index     *databasev1.IndexRule
-       fieldRefs []*TagRef
-       Sort      modelv1.Sort
+       Index *databasev1.IndexRule
+       Sort  modelv1.Sort
 }
 
 // Equal reports whether o and other has the same sorting order and name.
@@ -79,14 +78,8 @@ func ParseOrderBy(s Schema, indexRuleName string, sort 
modelv1.Sort) (*OrderBy,
                return nil, errors.Wrap(errIndexSortingUnsupported, 
indexRuleName)
        }
 
-       projFieldSpecs, err := s.CreateTagRef(NewTags("", 
indexRule.GetTags()...))
-       if err != nil {
-               return nil, errors.Wrap(ErrTagNotDefined, indexRuleName)
-       }
-
        return &OrderBy{
-               Sort:      sort,
-               Index:     indexRule,
-               fieldRefs: projFieldSpecs[0],
+               Sort:  sort,
+               Index: indexRule,
        }, nil
 }
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 51a59d04..d9ae7dcc 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -213,13 +213,6 @@ func MergeSchemas(schemas []*CommonSchema) (*CommonSchema, 
error) {
        indexRuleMap := make(map[string]*databasev1.IndexRule)
 
        for _, s := range schemas {
-               for tagName, tagSpec := range s.TagSpecMap {
-                       if existingSpec, exists := merged.TagSpecMap[tagName]; 
!exists {
-                               merged.TagSpecMap[tagName] = tagSpec
-                       } else if existingSpec.Spec.Type != tagSpec.Spec.Type {
-                               existingSpec.Spec.Type = 
databasev1.TagType_TAG_TYPE_UNSPECIFIED
-                       }
-               }
                for _, rule := range s.IndexRules {
                        if existedRule := indexRuleMap[rule.Metadata.Name]; 
existedRule == nil {
                                indexRuleMap[rule.Metadata.Name] = rule
@@ -235,7 +228,8 @@ func MergeSchemas(schemas []*CommonSchema) (*CommonSchema, 
error) {
        return merged, nil
 }
 
-func mergeTagSpecs(dst, src []*databasev1.TagSpec) []*databasev1.TagSpec {
+// MergeTagSpecs merges two slices of TagSpec.
+func MergeTagSpecs(dst, src []*databasev1.TagSpec) []*databasev1.TagSpec {
        res := make([]*databasev1.TagSpec, 0, len(dst)+len(src))
        res = append(res, dst...)
        for _, s := range src {
@@ -244,8 +238,11 @@ func mergeTagSpecs(dst, src []*databasev1.TagSpec) 
[]*databasev1.TagSpec {
                        if d.Name == s.Name {
                                if d.Type != s.Type {
                                        // If the type is different, the tag 
spec is not compatible.
-                                       // We need to set the type to 
unspecifed.
-                                       res[i].Type = 
databasev1.TagType_TAG_TYPE_UNSPECIFIED
+                                       // Create a copy with type set to 
unspecified to avoid modifying the original schema.
+                                       res[i] = &databasev1.TagSpec{
+                                               Name: d.Name,
+                                               Type: 
databasev1.TagType_TAG_TYPE_UNSPECIFIED,
+                                       }
                                }
                                found = true
                                break
@@ -266,7 +263,11 @@ func MergeTagFamilySpecs(dst []*databasev1.TagFamilySpec, 
src []*databasev1.TagF
                found := false
                for i, d := range dst {
                        if d.Name == s.Name {
-                               res[i].Tags = mergeTagSpecs(d.Tags, s.Tags)
+                               // Create a copy to avoid modifying the 
original schema.
+                               res[i] = &databasev1.TagFamilySpec{
+                                       Name: d.Name,
+                                       Tags: MergeTagSpecs(d.Tags, s.Tags),
+                               }
                                found = true
                                break
                        }
diff --git a/pkg/query/logical/trace/schema.go 
b/pkg/query/logical/trace/schema.go
index 59c2d54e..b0475764 100644
--- a/pkg/query/logical/trace/schema.go
+++ b/pkg/query/logical/trace/schema.go
@@ -121,6 +121,7 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, 
error) {
                return schemas[0], nil
        }
        var commonSchemas []*logical.CommonSchema
+       var allTags []*databasev1.TagSpec
        for _, sm := range schemas {
                if sm == nil {
                        continue
@@ -129,6 +130,15 @@ func mergeSchema(schemas []logical.Schema) 
(logical.Schema, error) {
                if s == nil {
                        continue
                }
+               traceTags := s.trace.GetTags()
+               tagSpecs := make([]*databasev1.TagSpec, len(traceTags))
+               for i, tt := range traceTags {
+                       tagSpecs[i] = &databasev1.TagSpec{
+                               Name: tt.GetName(),
+                               Type: tt.GetType(),
+                       }
+               }
+               allTags = logical.MergeTagSpecs(allTags, tagSpecs)
                commonSchemas = append(commonSchemas, s.common)
        }
        merged, err := logical.MergeSchemas(commonSchemas)
@@ -139,5 +149,8 @@ func mergeSchema(schemas []logical.Schema) (logical.Schema, 
error) {
                common:   merged,
                children: schemas,
        }
+       for i, tag := range allTags {
+               ret.common.RegisterTag(0, i, tag)
+       }
        return ret, nil
 }

Reply via email to