This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch measure-aggregation
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/measure-aggregation by this 
push:
     new bab0b89  add groupBy test
bab0b89 is described below

commit bab0b8935ab159e3075df2d1962a86d3f0f5bbb2
Author: Megrez Lu <[email protected]>
AuthorDate: Tue Mar 8 12:05:45 2022 +0800

    add groupBy test
---
 pkg/query/logical/measure_plan_execution_test.go  | 65 +++++++++++++++++++++++
 pkg/query/logical/measure_plan_groupby.go         |  3 +-
 pkg/query/logical/measure_plan_indexscan_local.go |  2 +-
 pkg/query/logical/schema.go                       | 15 ++++--
 4 files changed, 78 insertions(+), 7 deletions(-)

diff --git a/pkg/query/logical/measure_plan_execution_test.go 
b/pkg/query/logical/measure_plan_execution_test.go
index 82b35e9..bea7288 100644
--- a/pkg/query/logical/measure_plan_execution_test.go
+++ b/pkg/query/logical/measure_plan_execution_test.go
@@ -19,6 +19,7 @@ package logical_test
 
 import (
        "context"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "testing"
        "time"
 
@@ -112,3 +113,67 @@ func TestMeasurePlanExecution_IndexScan(t *testing.T) {
                })
        }
 }
+
+func TestMeasurePlanExecution_GroupByAndIndexScan(t *testing.T) {
+       tester := require.New(t)
+       measureSvc, metaService, deferFunc := setupMeasure(tester)
+       defer deferFunc()
+       baseTs := setupMeasureQueryData(t, "measure_query_data.json", 
measureSvc)
+
+       metadata := &commonv1.Metadata{
+               Name:  "cpm",
+               Group: "default",
+       }
+
+       sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+       analyzer, err := 
logical.CreateMeasureAnalyzerFromMetaService(metaService)
+       tester.NoError(err)
+       tester.NotNil(analyzer)
+
+       tests := []struct {
+               name           string
+               unresolvedPlan logical.UnresolvedPlan
+               wantLength     int
+               tagLength      []int
+               fieldLength    int
+       }{
+               {
+                       name: "Group By with Max",
+                       unresolvedPlan: logical.GroupByAggregation(
+                               logical.MeasureIndexScan(sT, eT, metadata, 
[]logical.Expr{
+                                       logical.Eq(logical.NewTagRef("default", 
"scope"), logical.Str("minute")),
+                               }, tsdb.Entity{tsdb.AnyEntry}, 
[][]*logical.Tag{logical.NewTags("default", "scope")}, 
[]*logical.Field{logical.NewField("value")}),
+                               logical.NewField("value"), 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX,
+                               [][]*logical.Tag{logical.NewTags("default", 
"scope")},
+                       ),
+                       wantLength:  1,
+                       tagLength:   []int{1},
+                       fieldLength: 1,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       tester := require.New(t)
+                       schema, err := 
analyzer.BuildMeasureSchema(context.TODO(), metadata)
+                       tester.NoError(err)
+
+                       plan, err := tt.unresolvedPlan.Analyze(schema)
+                       tester.NoError(err)
+                       tester.NotNil(plan)
+
+                       dataPoints, err := 
plan.(executor.MeasureExecutable).Execute(measureSvc)
+                       tester.NoError(err)
+                       tester.Len(dataPoints, tt.wantLength)
+
+                       for _, dp := range dataPoints {
+                               tester.Len(dp.GetFields(), tt.fieldLength)
+                               tester.Len(dp.GetTagFamilies(), 
len(tt.tagLength))
+                               for tagFamilyIdx, tagFamily := range 
dp.GetTagFamilies() {
+                                       tester.Len(tagFamily.GetTags(), 
tt.tagLength[tagFamilyIdx])
+                               }
+                       }
+               })
+       }
+}
diff --git a/pkg/query/logical/measure_plan_groupby.go 
b/pkg/query/logical/measure_plan_groupby.go
index 8e6a72e..5eeb07b 100644
--- a/pkg/query/logical/measure_plan_groupby.go
+++ b/pkg/query/logical/measure_plan_groupby.go
@@ -106,7 +106,7 @@ func (g *groupByAggregation) Children() []Plan {
 }
 
 func (g *groupByAggregation) Schema() Schema {
-       return 
g.schema.ProjField(g.aggregationFieldRef).ProjTags(g.groupByTagsRefs...)
+       return 
g.schema.ProjFields(g.aggregationFieldRef).ProjTags(g.groupByTagsRefs...)
 }
 
 func (g *groupByAggregation) Execute(ec executor.MeasureExecutionContext) 
([]*measurev1.DataPoint, error) {
@@ -123,6 +123,7 @@ func (g *groupByAggregation) Execute(ec 
executor.MeasureExecutionContext) ([]*me
                op, ok := aggregationMap[key]
                if !ok {
                        op = g.createAggregationOp()
+                       aggregationMap[key] = op
                }
                if op == nil {
                        return nil, errors.New("aggregation op does not exist")
diff --git a/pkg/query/logical/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure_plan_indexscan_local.go
index 5e0dd1f..06fe554 100644
--- a/pkg/query/logical/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure_plan_indexscan_local.go
@@ -222,7 +222,7 @@ func (i *localMeasureIndexScan) Schema() Schema {
        if len(i.projectionTagsRefs) == 0 {
                return i.schema
        }
-       return i.schema.ProjTags(i.projectionTagsRefs...)
+       return 
i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
 }
 
 func (i *localMeasureIndexScan) Equal(plan Plan) bool {
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 462f2b1..534f594 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -34,7 +34,7 @@ type Schema interface {
        CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
        CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
        ProjTags(refs ...[]*TagRef) Schema
-       ProjField(*FieldRef) Schema
+       ProjFields(refs ...*FieldRef) Schema
        Equal(Schema) bool
        ShardNumber() uint32
        TraceIDFieldName() string
@@ -203,7 +203,7 @@ func (s *streamSchema) ProjTags(refs ...[]*TagRef) Schema {
        return newSchema
 }
 
-func (s *streamSchema) ProjField(*FieldRef) Schema {
+func (s *streamSchema) ProjFields(...*FieldRef) Schema {
        panic("stream does not support field")
 }
 
@@ -272,10 +272,15 @@ func (m *measureSchema) ProjTags(refs ...[]*TagRef) 
Schema {
        return newSchema
 }
 
-func (m *measureSchema) ProjField(fieldRef *FieldRef) Schema {
+func (m *measureSchema) ProjFields(fieldRefs ...*FieldRef) Schema {
        newFieldMap := make(map[string]*fieldSpec)
-       if spec, ok := m.fieldMap[fieldRef.field.name]; ok {
-               newFieldMap[fieldRef.field.name] = spec
+       i := 0
+       for _, fr := range fieldRefs {
+               if spec, ok := m.fieldMap[fr.field.name]; ok {
+                       spec.FieldIdx = i
+                       newFieldMap[fr.field.name] = spec
+               }
+               i++
        }
        return &measureSchema{
                measure:  m.measure,

Reply via email to