This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch measure-aggregation
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/measure-aggregation by this
push:
new bab0b89 add groupBy test
bab0b89 is described below
commit bab0b8935ab159e3075df2d1962a86d3f0f5bbb2
Author: Megrez Lu <[email protected]>
AuthorDate: Tue Mar 8 12:05:45 2022 +0800
add groupBy test
---
pkg/query/logical/measure_plan_execution_test.go | 65 +++++++++++++++++++++++
pkg/query/logical/measure_plan_groupby.go | 3 +-
pkg/query/logical/measure_plan_indexscan_local.go | 2 +-
pkg/query/logical/schema.go | 15 ++++--
4 files changed, 78 insertions(+), 7 deletions(-)
diff --git a/pkg/query/logical/measure_plan_execution_test.go
b/pkg/query/logical/measure_plan_execution_test.go
index 82b35e9..bea7288 100644
--- a/pkg/query/logical/measure_plan_execution_test.go
+++ b/pkg/query/logical/measure_plan_execution_test.go
@@ -19,6 +19,7 @@ package logical_test
import (
"context"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"testing"
"time"
@@ -112,3 +113,67 @@ func TestMeasurePlanExecution_IndexScan(t *testing.T) {
})
}
}
+
+func TestMeasurePlanExecution_GroupByAndIndexScan(t *testing.T) {
+ tester := require.New(t)
+ measureSvc, metaService, deferFunc := setupMeasure(tester)
+ defer deferFunc()
+ baseTs := setupMeasureQueryData(t, "measure_query_data.json",
measureSvc)
+
+ metadata := &commonv1.Metadata{
+ Name: "cpm",
+ Group: "default",
+ }
+
+ sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+ analyzer, err :=
logical.CreateMeasureAnalyzerFromMetaService(metaService)
+ tester.NoError(err)
+ tester.NotNil(analyzer)
+
+ tests := []struct {
+ name string
+ unresolvedPlan logical.UnresolvedPlan
+ wantLength int
+ tagLength []int
+ fieldLength int
+ }{
+ {
+ name: "Group By with Max",
+ unresolvedPlan: logical.GroupByAggregation(
+ logical.MeasureIndexScan(sT, eT, metadata,
[]logical.Expr{
+ logical.Eq(logical.NewTagRef("default",
"scope"), logical.Str("minute")),
+ }, tsdb.Entity{tsdb.AnyEntry},
[][]*logical.Tag{logical.NewTags("default", "scope")},
[]*logical.Field{logical.NewField("value")}),
+ logical.NewField("value"),
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX,
+ [][]*logical.Tag{logical.NewTags("default",
"scope")},
+ ),
+ wantLength: 1,
+ tagLength: []int{1},
+ fieldLength: 1,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tester := require.New(t)
+ schema, err :=
analyzer.BuildMeasureSchema(context.TODO(), metadata)
+ tester.NoError(err)
+
+ plan, err := tt.unresolvedPlan.Analyze(schema)
+ tester.NoError(err)
+ tester.NotNil(plan)
+
+ dataPoints, err :=
plan.(executor.MeasureExecutable).Execute(measureSvc)
+ tester.NoError(err)
+ tester.Len(dataPoints, tt.wantLength)
+
+ for _, dp := range dataPoints {
+ tester.Len(dp.GetFields(), tt.fieldLength)
+ tester.Len(dp.GetTagFamilies(),
len(tt.tagLength))
+ for tagFamilyIdx, tagFamily := range
dp.GetTagFamilies() {
+ tester.Len(tagFamily.GetTags(),
tt.tagLength[tagFamilyIdx])
+ }
+ }
+ })
+ }
+}
diff --git a/pkg/query/logical/measure_plan_groupby.go
b/pkg/query/logical/measure_plan_groupby.go
index 8e6a72e..5eeb07b 100644
--- a/pkg/query/logical/measure_plan_groupby.go
+++ b/pkg/query/logical/measure_plan_groupby.go
@@ -106,7 +106,7 @@ func (g *groupByAggregation) Children() []Plan {
}
func (g *groupByAggregation) Schema() Schema {
- return
g.schema.ProjField(g.aggregationFieldRef).ProjTags(g.groupByTagsRefs...)
+ return
g.schema.ProjFields(g.aggregationFieldRef).ProjTags(g.groupByTagsRefs...)
}
func (g *groupByAggregation) Execute(ec executor.MeasureExecutionContext)
([]*measurev1.DataPoint, error) {
@@ -123,6 +123,7 @@ func (g *groupByAggregation) Execute(ec
executor.MeasureExecutionContext) ([]*me
op, ok := aggregationMap[key]
if !ok {
op = g.createAggregationOp()
+ aggregationMap[key] = op
}
if op == nil {
return nil, errors.New("aggregation op does not exist")
diff --git a/pkg/query/logical/measure_plan_indexscan_local.go
b/pkg/query/logical/measure_plan_indexscan_local.go
index 5e0dd1f..06fe554 100644
--- a/pkg/query/logical/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure_plan_indexscan_local.go
@@ -222,7 +222,7 @@ func (i *localMeasureIndexScan) Schema() Schema {
if len(i.projectionTagsRefs) == 0 {
return i.schema
}
- return i.schema.ProjTags(i.projectionTagsRefs...)
+ return
i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
}
func (i *localMeasureIndexScan) Equal(plan Plan) bool {
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 462f2b1..534f594 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -34,7 +34,7 @@ type Schema interface {
CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
ProjTags(refs ...[]*TagRef) Schema
- ProjField(*FieldRef) Schema
+ ProjFields(refs ...*FieldRef) Schema
Equal(Schema) bool
ShardNumber() uint32
TraceIDFieldName() string
@@ -203,7 +203,7 @@ func (s *streamSchema) ProjTags(refs ...[]*TagRef) Schema {
return newSchema
}
-func (s *streamSchema) ProjField(*FieldRef) Schema {
+func (s *streamSchema) ProjFields(...*FieldRef) Schema {
panic("stream does not support field")
}
@@ -272,10 +272,15 @@ func (m *measureSchema) ProjTags(refs ...[]*TagRef)
Schema {
return newSchema
}
-func (m *measureSchema) ProjField(fieldRef *FieldRef) Schema {
+func (m *measureSchema) ProjFields(fieldRefs ...*FieldRef) Schema {
newFieldMap := make(map[string]*fieldSpec)
- if spec, ok := m.fieldMap[fieldRef.field.name]; ok {
- newFieldMap[fieldRef.field.name] = spec
+ i := 0
+ for _, fr := range fieldRefs {
+ if spec, ok := m.fieldMap[fr.field.name]; ok {
+ spec.FieldIdx = i
+ newFieldMap[fr.field.name] = spec
+ }
+ i++
}
return &measureSchema{
measure: m.measure,