This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c12c4ed21 Map-Reduce Aggregation Redesign (#970)
c12c4ed21 is described below
commit c12c4ed214488eed23b3ed608b93906562e1aa84
Author: OmCheeLin <[email protected]>
AuthorDate: Sun Feb 22 17:17:02 2026 +0800
Map-Reduce Aggregation Redesign (#970)
* Enhance measure query capabilities with map-reduce aggregation support
and improve topN post-processor logic. Fix error handling in aggregation
iterator close method.
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
api/proto/banyandb/measure/v1/query.proto | 4 +-
banyand/measure/topn_post_processor.go | 33 ++--
banyand/query/processor.go | 15 +-
docs/api-reference.md | 3 +-
pkg/query/aggregation/aggregation.go | 90 +++++++++-
pkg/query/aggregation/function.go | 119 +++++++++++++
pkg/query/logical/measure/measure_analyzer.go | 27 ++-
.../logical/measure/measure_plan_aggregation.go | 185 +++++++++++++++------
.../logical/measure/measure_plan_distributed.go | 96 ++++++-----
pkg/query/logical/measure/topn_analyzer.go | 4 +-
test/cases/measure/data/input/group_mean.ql | 21 +++
test/cases/measure/data/input/group_mean.yaml | 34 ++++
test/cases/measure/data/want/group_mean.yaml | 54 ++++++
test/cases/measure/measure.go | 1 +
15 files changed, 546 insertions(+), 141 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3ffc1b9be..b9c8ff244 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,6 +23,7 @@ Release Notes.
- From: `<data-dir>/property/data/shard-<id>/...`
- To: `<data-dir>/property/data/<group>/shard-<id>/...`
- Add a generic snapshot coordination package for atomic snapshot transitions
across trace and sidx.
+- Support map-reduce aggregation for measure queries: map phase (partial
aggregation on data nodes) and reduce phase (final aggregation on liaison).
### Bug Fixes
diff --git a/api/proto/banyandb/measure/v1/query.proto
b/api/proto/banyandb/measure/v1/query.proto
index a3d10adbd..e518762cc 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -69,6 +69,8 @@ message InternalDataPoint {
message InternalQueryRequest {
// The actual query request
QueryRequest request = 1;
+ // agg_return_partial when true asks data nodes to return aggregation
partials (for reduce at liaison)
+ bool agg_return_partial = 2;
}
// InternalQueryResponse is the internal response for distributed query.
@@ -138,6 +140,6 @@ message QueryRequest {
bool trace = 13;
// stages is used to specify the stage of the data points in the lifecycle
repeated string stages = 14;
- // rewriteAggTopNResult will rewrite agg result to raw data
+ // rewrite_agg_top_n_result will rewrite agg result to raw data
bool rewrite_agg_top_n_result = 15;
}
diff --git a/banyand/measure/topn_post_processor.go
b/banyand/measure/topn_post_processor.go
index 4d219d73c..a97100c4a 100644
--- a/banyand/measure/topn_post_processor.go
+++ b/banyand/measure/topn_post_processor.go
@@ -70,9 +70,9 @@ func (taggr *topNPostProcessor) Len() int {
// while for ASC, a max heap has to be built.
func (taggr *topNPostProcessor) Less(i, j int) bool {
if taggr.sort == modelv1.Sort_SORT_DESC {
- return taggr.items[i].int64Func.Val() <
taggr.items[j].int64Func.Val()
+ return taggr.items[i].mapFunc.Val() <
taggr.items[j].mapFunc.Val()
}
- return taggr.items[i].int64Func.Val() > taggr.items[j].int64Func.Val()
+ return taggr.items[i].mapFunc.Val() > taggr.items[j].mapFunc.Val()
}
func (taggr *topNPostProcessor) Swap(i, j int) {
@@ -99,9 +99,12 @@ func (taggr *topNPostProcessor) Pop() any {
}
func (taggr *topNPostProcessor) tryEnqueue(key string, item
*topNAggregatorItem) {
+ if len(taggr.items) == 0 {
+ return
+ }
if lowest := taggr.items[0]; lowest != nil {
- shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() < item.int64Func.Val()) ||
- (taggr.sort != modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() > item.int64Func.Val())
+ shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowest.mapFunc.Val() < item.mapFunc.Val()) ||
+ (taggr.sort != modelv1.Sort_SORT_DESC &&
lowest.mapFunc.Val() > item.mapFunc.Val())
if shouldReplace {
delete(taggr.cache, lowest.key)
@@ -116,12 +119,12 @@ func (taggr *topNPostProcessor) tryEnqueue(key string,
item *topNAggregatorItem)
var _ flow.Element = (*topNAggregatorItem)(nil)
type topNAggregatorItem struct {
- int64Func aggregation.Func[int64]
- key string
- values pbv1.EntityValues
- val int64
- version int64
- index int
+ mapFunc aggregation.Map[int64]
+ key string
+ values pbv1.EntityValues
+ val int64
+ version int64
+ index int
}
func (n *topNAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
@@ -245,18 +248,18 @@ func (taggr *topNPostProcessor) Flush()
([]*topNAggregatorItem, error) {
for _, timeline := range taggr.timelines {
for _, item := range timeline.items {
if exist, found := taggr.cache[item.key]; found
{
- exist.int64Func.In(item.val)
+ exist.mapFunc.In(item.val)
heap.Fix(taggr, exist.index)
continue
}
- aggrFunc, err :=
aggregation.NewFunc[int64](taggr.aggrFunc)
+ mapFunc, err :=
aggregation.NewMap[int64](taggr.aggrFunc)
if err != nil {
return nil, err
}
- item.int64Func = aggrFunc
- item.int64Func.In(item.val)
+ item.mapFunc = mapFunc
+ item.mapFunc.In(item.val)
if taggr.Len() < int(taggr.topN) {
taggr.cache[item.key] = item
@@ -300,7 +303,7 @@ func (taggr *topNPostProcessor) valWithAggregation(tagNames
[]string) ([]*measur
Entity: item.GetTags(tagNames),
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{Value:
item.int64Func.Val()},
+ Int: &modelv1.Int{Value:
item.mapFunc.Val()},
},
},
}
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index cbe7d783e..c6efbbda2 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -202,8 +202,13 @@ func buildMeasureContext(measureService measure.Service,
log *logger.Logger, que
}
// executeMeasurePlan executes the measure query plan and returns the iterator.
-func executeMeasurePlan(ctx context.Context, queryCriteria
*measurev1.QueryRequest, mctx *measureExecutionContext) (executor.MIterator,
logical.Plan, error) {
- plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata,
mctx.schemas, mctx.ecc)
+func executeMeasurePlan(
+ ctx context.Context,
+ queryCriteria *measurev1.QueryRequest,
+ mctx *measureExecutionContext,
+ emitPartial bool,
+) (executor.MIterator, logical.Plan, error) {
+ plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata,
mctx.schemas, mctx.ecc, emitPartial)
if planErr != nil {
return nil, nil, fmt.Errorf("fail to analyze the query request
for measure %s: %w", queryCriteria.GetName(), planErr)
}
@@ -343,7 +348,7 @@ func (p *measureQueryProcessor) executeQuery(ctx
context.Context, queryCriteria
e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a
query event")
}
- mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+ mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria,
mctx, false)
if execErr != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v",
execErr))
return
@@ -462,7 +467,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx
context.Context, message bus.Mes
e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received an
internal query event")
}
- mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+ mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria,
mctx, internalRequest.GetAggReturnPartial())
if execErr != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v",
execErr))
return
@@ -507,7 +512,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx
context.Context, message bus.Mes
mctx.ml.Error().Err(rewriteErr).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
} else {
rewriteQueryCriteria :=
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
- rewriteIterator, _, rewriteExecErr :=
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx)
+ rewriteIterator, _, rewriteExecErr :=
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx, false)
if rewriteExecErr != nil {
mctx.ml.Error().Err(rewriteExecErr).RawJSON("req",
logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query
plan")
} else {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index aa75e0bca..764624cc9 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -960,6 +960,7 @@ Wraps QueryRequest for extensibility.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| request | [QueryRequest](#banyandb-measure-v1-QueryRequest) | | The actual
query request |
+| agg_return_partial | [bool](#bool) | | agg_return_partial when true asks
data nodes to return aggregation partials (for reduce at liaison) |
@@ -1005,7 +1006,7 @@ QueryRequest is the request contract for query.
| order_by | [banyandb.model.v1.QueryOrder](#banyandb-model-v1-QueryOrder) |
| order_by is given to specify the sort for a tag. |
| trace | [bool](#bool) | | trace is used to enable trace for the query |
| stages | [string](#string) | repeated | stages is used to specify the stage
of the data points in the lifecycle |
-| rewrite_agg_top_n_result | [bool](#bool) | | rewriteAggTopNResult will
rewrite agg result to raw data |
+| rewrite_agg_top_n_result | [bool](#bool) | | rewrite_agg_top_n_result will
rewrite agg result to raw data |
diff --git a/pkg/query/aggregation/aggregation.go
b/pkg/query/aggregation/aggregation.go
index a581e670a..c520a39d8 100644
--- a/pkg/query/aggregation/aggregation.go
+++ b/pkg/query/aggregation/aggregation.go
@@ -31,10 +31,26 @@ var (
errUnSupportedFieldType = errors.New("unsupported field type")
)
-// Func supports aggregation operations.
-type Func[N Number] interface {
+// Partial represents the intermediate result of a Map phase.
+// For most functions only Value is meaningful; for MEAN both Value (sum) and
Count are used.
+type Partial[N Number] struct {
+ Value N
+ Count N
+}
+
+// Map accumulates raw values and produces aggregation results.
+// It serves as the local accumulator for raw data points.
+type Map[N Number] interface {
In(N)
Val() N
+ Partial() Partial[N]
+ Reset()
+}
+
+// Reduce combines intermediate results from Map phases into a final value.
+type Reduce[N Number] interface {
+ Combine(Partial[N])
+ Val() N
Reset()
}
@@ -43,9 +59,9 @@ type Number interface {
~int64 | ~float64
}
-// NewFunc returns a aggregation function based on function type.
-func NewFunc[N Number](af modelv1.AggregationFunction) (Func[N], error) {
- var result Func[N]
+// NewMap returns a Map aggregation function for the given type.
+func NewMap[N Number](af modelv1.AggregationFunction) (Map[N], error) {
+ var result Map[N]
switch af {
case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
result = &meanFunc[N]{zero: zero[N]()}
@@ -64,6 +80,27 @@ func NewFunc[N Number](af modelv1.AggregationFunction)
(Func[N], error) {
return result, nil
}
+// NewReduce returns a Reduce aggregation function for the given type.
+func NewReduce[N Number](af modelv1.AggregationFunction) (Reduce[N], error) {
+ var result Reduce[N]
+ switch af {
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
+ result = &meanReduceFunc[N]{zero: zero[N]()}
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+ result = &countReduceFunc[N]{zero: zero[N]()}
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+ result = &maxReduceFunc[N]{min: minOf[N]()}
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+ result = &minReduceFunc[N]{max: maxOf[N]()}
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM:
+ result = &sumReduceFunc[N]{zero: zero[N]()}
+ default:
+ return nil, errors.WithMessagef(errUnknownFunc, "unknown
function:%s", modelv1.AggregationFunction_name[int32(af)])
+ }
+ result.Reset()
+ return result, nil
+}
+
// FromFieldValue transforms modelv1.FieldValue to Number.
func FromFieldValue[N Number](fieldValue *modelv1.FieldValue) (N, error) {
switch fieldValue.GetValue().(type) {
@@ -86,6 +123,49 @@ func ToFieldValue[N Number](value N) (*modelv1.FieldValue,
error) {
return nil, errUnSupportedFieldType
}
+// PartialToFieldValues converts a Partial to field values for wire transport.
+// For MEAN it returns two values (Value/sum first, Count second); for others
one value.
+func PartialToFieldValues[N Number](af modelv1.AggregationFunction, p
Partial[N]) ([]*modelv1.FieldValue, error) {
+ if af == modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN {
+ vFv, err := ToFieldValue(p.Value)
+ if err != nil {
+ return nil, err
+ }
+ cFv, err := ToFieldValue(p.Count)
+ if err != nil {
+ return nil, err
+ }
+ return []*modelv1.FieldValue{vFv, cFv}, nil
+ }
+ vFv, err := ToFieldValue(p.Value)
+ if err != nil {
+ return nil, err
+ }
+ return []*modelv1.FieldValue{vFv}, nil
+}
+
+// FieldValuesToPartial converts field values from wire transport to a Partial.
+// For MEAN expects two values (sum, count); for others one value (Count will
be zero).
+func FieldValuesToPartial[N Number](af modelv1.AggregationFunction, fvs
[]*modelv1.FieldValue) (Partial[N], error) {
+ var p Partial[N]
+ if len(fvs) == 0 {
+ return p, nil
+ }
+ v, err := FromFieldValue[N](fvs[0])
+ if err != nil {
+ return p, err
+ }
+ p.Value = v
+ if af == modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
len(fvs) >= 2 {
+ c, err := FromFieldValue[N](fvs[1])
+ if err != nil {
+ return p, err
+ }
+ p.Count = c
+ }
+ return p, nil
+}
+
func minOf[N Number]() (r N) {
switch x := any(&r).(type) {
case *int64:
diff --git a/pkg/query/aggregation/function.go
b/pkg/query/aggregation/function.go
index faa019d97..0e009dac2 100644
--- a/pkg/query/aggregation/function.go
+++ b/pkg/query/aggregation/function.go
@@ -39,11 +39,42 @@ func (m meanFunc[N]) Val() N {
return v
}
+func (m meanFunc[N]) Partial() Partial[N] {
+ return Partial[N]{Value: m.sum, Count: m.count}
+}
+
func (m *meanFunc[N]) Reset() {
m.sum = m.zero
m.count = m.zero
}
+type meanReduceFunc[N Number] struct {
+ sum N
+ count N
+ zero N
+}
+
+func (m *meanReduceFunc[N]) Combine(p Partial[N]) {
+ m.sum += p.Value
+ m.count += p.Count
+}
+
+func (m meanReduceFunc[N]) Val() N {
+ if m.count == m.zero {
+ return m.zero
+ }
+ v := m.sum / m.count
+ if v < 1 {
+ return 1
+ }
+ return v
+}
+
+func (m *meanReduceFunc[N]) Reset() {
+ m.sum = m.zero
+ m.count = m.zero
+}
+
type countFunc[N Number] struct {
count N
zero N
@@ -57,10 +88,31 @@ func (c countFunc[N]) Val() N {
return c.count
}
+func (c countFunc[N]) Partial() Partial[N] {
+ return Partial[N]{Value: c.count}
+}
+
func (c *countFunc[N]) Reset() {
c.count = c.zero
}
+type countReduceFunc[N Number] struct {
+ sum N
+ zero N
+}
+
+func (c *countReduceFunc[N]) Combine(p Partial[N]) {
+ c.sum += p.Value
+}
+
+func (c countReduceFunc[N]) Val() N {
+ return c.sum
+}
+
+func (c *countReduceFunc[N]) Reset() {
+ c.sum = c.zero
+}
+
type sumFunc[N Number] struct {
sum N
zero N
@@ -74,10 +126,31 @@ func (s sumFunc[N]) Val() N {
return s.sum
}
+func (s sumFunc[N]) Partial() Partial[N] {
+ return Partial[N]{Value: s.sum}
+}
+
func (s *sumFunc[N]) Reset() {
s.sum = s.zero
}
+type sumReduceFunc[N Number] struct {
+ sum N
+ zero N
+}
+
+func (s *sumReduceFunc[N]) Combine(p Partial[N]) {
+ s.sum += p.Value
+}
+
+func (s sumReduceFunc[N]) Val() N {
+ return s.sum
+}
+
+func (s *sumReduceFunc[N]) Reset() {
+ s.sum = s.zero
+}
+
type maxFunc[N Number] struct {
val N
min N
@@ -93,10 +166,33 @@ func (m maxFunc[N]) Val() N {
return m.val
}
+func (m maxFunc[N]) Partial() Partial[N] {
+ return Partial[N]{Value: m.val}
+}
+
func (m *maxFunc[N]) Reset() {
m.val = m.min
}
+type maxReduceFunc[N Number] struct {
+ val N
+ min N
+}
+
+func (m *maxReduceFunc[N]) Combine(p Partial[N]) {
+ if p.Value > m.val {
+ m.val = p.Value
+ }
+}
+
+func (m maxReduceFunc[N]) Val() N {
+ return m.val
+}
+
+func (m *maxReduceFunc[N]) Reset() {
+ m.val = m.min
+}
+
type minFunc[N Number] struct {
val N
max N
@@ -112,6 +208,29 @@ func (m minFunc[N]) Val() N {
return m.val
}
+func (m minFunc[N]) Partial() Partial[N] {
+ return Partial[N]{Value: m.val}
+}
+
func (m *minFunc[N]) Reset() {
m.val = m.max
}
+
+type minReduceFunc[N Number] struct {
+ val N
+ max N
+}
+
+func (m *minReduceFunc[N]) Combine(p Partial[N]) {
+ if m.val == m.max || p.Value < m.val {
+ m.val = p.Value
+ }
+}
+
+func (m minReduceFunc[N]) Val() N {
+ return m.val
+}
+
+func (m *minReduceFunc[N]) Reset() {
+ m.val = m.max
+}
diff --git a/pkg/query/logical/measure/measure_analyzer.go
b/pkg/query/logical/measure/measure_analyzer.go
index f92c5127e..d096b14ac 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -24,7 +24,6 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -55,7 +54,9 @@ func BuildSchema(md *databasev1.Measure, indexRules
[]*databasev1.IndexRule) (lo
}
// Analyze converts logical expressions to executable operation tree
represented by Plan.
-func Analyze(criteria *measurev1.QueryRequest, metadata []*commonv1.Metadata,
ss []logical.Schema, ecc []executor.MeasureExecutionContext) (logical.Plan,
error) {
+func Analyze(criteria *measurev1.QueryRequest, metadata []*commonv1.Metadata,
ss []logical.Schema,
+ ecc []executor.MeasureExecutionContext, emitPartial bool,
+) (logical.Plan, error) {
if len(metadata) != len(ss) {
return nil, fmt.Errorf("number of schemas %d not equal to
metadata count %d", len(ss), len(metadata))
}
@@ -123,6 +124,8 @@ func Analyze(criteria *measurev1.QueryRequest, metadata
[]*commonv1.Metadata, ss
logical.NewField(criteria.GetAgg().GetFieldName()),
criteria.GetAgg().GetFunction(),
criteria.GetGroupBy() != nil,
+ emitPartial,
+ false,
)
pushedLimit = math.MaxInt
}
@@ -157,16 +160,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
}
}
- // TODO: to support all aggregation functions
- needCompletePushDownAgg := criteria.GetAgg() != nil &&
- (criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
- criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
- criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM ||
- criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT) &&
- criteria.GetTop() == nil
-
- // parse fields
- plan := newUnresolvedDistributed(criteria, needCompletePushDownAgg)
+ pushDownAgg := criteria.GetAgg() != nil && criteria.GetTop() == nil
+ plan := newUnresolvedDistributed(criteria, pushDownAgg)
// parse limit and offset
limitParameter := criteria.GetLimit()
@@ -181,14 +176,12 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
}
if criteria.GetAgg() != nil {
- aggrFunc := criteria.GetAgg().GetFunction()
- if needCompletePushDownAgg && aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT {
- aggrFunc =
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM
- }
plan = newUnresolvedAggregation(plan,
logical.NewField(criteria.GetAgg().GetFieldName()),
- aggrFunc,
+ criteria.GetAgg().GetFunction(),
criteria.GetGroupBy() != nil,
+ false, // emitPartial: liaison does not emit
partial
+ pushDownAgg, // reduceMode: only reduce partials when
push-down is active (no TopN)
)
pushedLimit = math.MaxInt
}
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go
b/pkg/query/logical/measure/measure_plan_aggregation.go
index ec2b06a68..cd6f75847 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -32,25 +32,116 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
+const aggCountFieldName = "__agg_count"
+
var (
_ logical.UnresolvedPlan = (*unresolvedAggregation)(nil)
errUnsupportedAggregationField = errors.New("unsupported aggregation
operation on this field")
)
+// aggAccumulator abstracts the aggregation logic for both map and reduce
modes.
+// It is injected into the existing iterators to avoid creating separate
iterator types.
+type aggAccumulator[N aggregation.Number] interface {
+ Feed(dp *measurev1.DataPoint, fieldIdx int) error
+ Result(fieldName string) ([]*measurev1.DataPoint_Field, error)
+ Reset()
+}
+
+// mapAccumulator implements aggAccumulator for the map phase (data node side).
+type mapAccumulator[N aggregation.Number] struct {
+ mapFunc aggregation.Map[N]
+ aggrType modelv1.AggregationFunction
+ emitPartial bool
+}
+
+func (a *mapAccumulator[N]) Feed(dp *measurev1.DataPoint, fieldIdx int) error {
+ v, parseErr :=
aggregation.FromFieldValue[N](dp.GetFields()[fieldIdx].GetValue())
+ if parseErr != nil {
+ return parseErr
+ }
+ a.mapFunc.In(v)
+ return nil
+}
+
+func (a *mapAccumulator[N]) Result(fieldName string)
([]*measurev1.DataPoint_Field, error) {
+ if a.emitPartial {
+ part := a.mapFunc.Partial()
+ fvs, partErr := aggregation.PartialToFieldValues(a.aggrType,
part)
+ if partErr != nil {
+ return nil, partErr
+ }
+ fields := make([]*measurev1.DataPoint_Field, len(fvs))
+ for idx, fv := range fvs {
+ name := fieldName
+ if idx > 0 {
+ name = aggCountFieldName
+ }
+ fields[idx] = &measurev1.DataPoint_Field{Name: name,
Value: fv}
+ }
+ return fields, nil
+ }
+ val, valErr := aggregation.ToFieldValue(a.mapFunc.Val())
+ if valErr != nil {
+ return nil, valErr
+ }
+ return []*measurev1.DataPoint_Field{{Name: fieldName, Value: val}}, nil
+}
+
+func (a *mapAccumulator[N]) Reset() {
+ a.mapFunc.Reset()
+}
+
+// reduceAccumulator implements aggAccumulator for the reduce phase (liaison
side).
+type reduceAccumulator[N aggregation.Number] struct {
+ reduceFunc aggregation.Reduce[N]
+ aggrType modelv1.AggregationFunction
+}
+
+func (a *reduceAccumulator[N]) Feed(dp *measurev1.DataPoint, _ int) error {
+ fvs := make([]*modelv1.FieldValue, len(dp.GetFields()))
+ for idx, f := range dp.GetFields() {
+ fvs[idx] = f.GetValue()
+ }
+ part, partErr := aggregation.FieldValuesToPartial[N](a.aggrType, fvs)
+ if partErr != nil {
+ return partErr
+ }
+ a.reduceFunc.Combine(part)
+ return nil
+}
+
+func (a *reduceAccumulator[N]) Result(fieldName string)
([]*measurev1.DataPoint_Field, error) {
+ val, valErr := aggregation.ToFieldValue(a.reduceFunc.Val())
+ if valErr != nil {
+ return nil, valErr
+ }
+ return []*measurev1.DataPoint_Field{{Name: fieldName, Value: val}}, nil
+}
+
+func (a *reduceAccumulator[N]) Reset() {
+ a.reduceFunc.Reset()
+}
+
type unresolvedAggregation struct {
unresolvedInput logical.UnresolvedPlan
aggregationField *logical.Field
aggrFunc modelv1.AggregationFunction
isGroup bool
+ emitPartial bool
+ reduceMode bool
}
-func newUnresolvedAggregation(input logical.UnresolvedPlan, aggrField
*logical.Field, aggrFunc modelv1.AggregationFunction, isGroup bool)
logical.UnresolvedPlan {
+func newUnresolvedAggregation(input logical.UnresolvedPlan, aggrField
*logical.Field, aggrFunc modelv1.AggregationFunction,
+ isGroup bool, emitPartial bool, reduceMode bool,
+) logical.UnresolvedPlan {
return &unresolvedAggregation{
unresolvedInput: input,
aggrFunc: aggrFunc,
aggregationField: aggrField,
isGroup: isGroup,
+ emitPartial: emitPartial,
+ reduceMode: reduceMode,
}
}
@@ -83,7 +174,7 @@ type aggregationPlan[N aggregation.Number] struct {
*logical.Parent
schema logical.Schema
aggregationFieldRef *logical.FieldRef
- aggrFunc aggregation.Func[N]
+ accumulator aggAccumulator[N]
aggrType modelv1.AggregationFunction
isGroup bool
}
@@ -91,9 +182,19 @@ type aggregationPlan[N aggregation.Number] struct {
func newAggregationPlan[N aggregation.Number](gba *unresolvedAggregation,
prevPlan logical.Plan,
measureSchema logical.Schema, fieldRef *logical.FieldRef,
) (*aggregationPlan[N], error) {
- aggrFunc, err := aggregation.NewFunc[N](gba.aggrFunc)
- if err != nil {
- return nil, err
+ var acc aggAccumulator[N]
+ if gba.reduceMode {
+ reduceFunc, reduceErr := aggregation.NewReduce[N](gba.aggrFunc)
+ if reduceErr != nil {
+ return nil, reduceErr
+ }
+ acc = &reduceAccumulator[N]{reduceFunc: reduceFunc, aggrType:
gba.aggrFunc}
+ } else {
+ mapFunc, mapErr := aggregation.NewMap[N](gba.aggrFunc)
+ if mapErr != nil {
+ return nil, mapErr
+ }
+ acc = &mapAccumulator[N]{mapFunc: mapFunc, aggrType:
gba.aggrFunc, emitPartial: gba.emitPartial}
}
return &aggregationPlan[N]{
Parent: &logical.Parent{
@@ -101,8 +202,9 @@ func newAggregationPlan[N aggregation.Number](gba
*unresolvedAggregation, prevPl
Input: prevPlan,
},
schema: measureSchema,
- aggrFunc: aggrFunc,
+ accumulator: acc,
aggregationFieldRef: fieldRef,
+ aggrType: gba.aggrFunc,
isGroup: gba.isGroup,
}, nil
}
@@ -128,28 +230,27 @@ func (g *aggregationPlan[N]) Execute(ec context.Context)
(executor.MIterator, er
return nil, err
}
if g.isGroup {
- return newAggGroupMIterator(iter, g.aggregationFieldRef,
g.aggrFunc), nil
+ return newAggGroupMIterator[N](iter, g.aggregationFieldRef,
g.accumulator), nil
}
- return newAggAllIterator(iter, g.aggregationFieldRef, g.aggrFunc), nil
+ return newAggAllIterator[N](iter, g.aggregationFieldRef,
g.accumulator), nil
}
type aggGroupIterator[N aggregation.Number] struct {
prev executor.MIterator
aggregationFieldRef *logical.FieldRef
- aggrFunc aggregation.Func[N]
-
- err error
+ accumulator aggAccumulator[N]
+ err error
}
func newAggGroupMIterator[N aggregation.Number](
prev executor.MIterator,
aggregationFieldRef *logical.FieldRef,
- aggrFunc aggregation.Func[N],
+ accumulator aggAccumulator[N],
) executor.MIterator {
return &aggGroupIterator[N]{
prev: prev,
aggregationFieldRef: aggregationFieldRef,
- aggrFunc: aggrFunc,
+ accumulator: accumulator,
}
}
@@ -164,20 +265,16 @@ func (ami *aggGroupIterator[N]) Current()
[]*measurev1.InternalDataPoint {
if ami.err != nil {
return nil
}
- ami.aggrFunc.Reset()
+ ami.accumulator.Reset()
group := ami.prev.Current()
var resultDp *measurev1.DataPoint
var shardID uint32
for _, idp := range group {
dp := idp.GetDataPoint()
- value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
- GetValue()
- v, err := aggregation.FromFieldValue[N](value)
- if err != nil {
- ami.err = err
+ if feedErr := ami.accumulator.Feed(dp,
ami.aggregationFieldRef.Spec.FieldIdx); feedErr != nil {
+ ami.err = feedErr
return nil
}
- ami.aggrFunc.In(v)
if resultDp != nil {
continue
}
@@ -189,17 +286,12 @@ func (ami *aggGroupIterator[N]) Current()
[]*measurev1.InternalDataPoint {
if resultDp == nil {
return nil
}
- val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
- if err != nil {
- ami.err = err
+ fields, resultErr :=
ami.accumulator.Result(ami.aggregationFieldRef.Field.Name)
+ if resultErr != nil {
+ ami.err = resultErr
return nil
}
- resultDp.Fields = []*measurev1.DataPoint_Field{
- {
- Name: ami.aggregationFieldRef.Field.Name,
- Value: val,
- },
- }
+ resultDp.Fields = fields
return []*measurev1.InternalDataPoint{{DataPoint: resultDp, ShardId:
shardID}}
}
@@ -210,21 +302,20 @@ func (ami *aggGroupIterator[N]) Close() error {
type aggAllIterator[N aggregation.Number] struct {
prev executor.MIterator
aggregationFieldRef *logical.FieldRef
- aggrFunc aggregation.Func[N]
-
- result *measurev1.DataPoint
- err error
+ accumulator aggAccumulator[N]
+ result *measurev1.DataPoint
+ err error
}
func newAggAllIterator[N aggregation.Number](
prev executor.MIterator,
aggregationFieldRef *logical.FieldRef,
- aggrFunc aggregation.Func[N],
+ accumulator aggAccumulator[N],
) executor.MIterator {
return &aggAllIterator[N]{
prev: prev,
aggregationFieldRef: aggregationFieldRef,
- aggrFunc: aggrFunc,
+ accumulator: accumulator,
}
}
@@ -237,14 +328,10 @@ func (ami *aggAllIterator[N]) Next() bool {
group := ami.prev.Current()
for _, idp := range group {
dp := idp.GetDataPoint()
- value :=
dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
- GetValue()
- v, err := aggregation.FromFieldValue[N](value)
- if err != nil {
- ami.err = err
+ if feedErr := ami.accumulator.Feed(dp,
ami.aggregationFieldRef.Spec.FieldIdx); feedErr != nil {
+ ami.err = feedErr
return false
}
- ami.aggrFunc.In(v)
if resultDp != nil {
continue
}
@@ -256,17 +343,12 @@ func (ami *aggAllIterator[N]) Next() bool {
if resultDp == nil {
return false
}
- val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
- if err != nil {
- ami.err = err
+ fields, resultErr :=
ami.accumulator.Result(ami.aggregationFieldRef.Field.Name)
+ if resultErr != nil {
+ ami.err = resultErr
return false
}
- resultDp.Fields = []*measurev1.DataPoint_Field{
- {
- Name: ami.aggregationFieldRef.Field.Name,
- Value: val,
- },
- }
+ resultDp.Fields = fields
ami.result = resultDp
return true
}
@@ -275,10 +357,9 @@ func (ami *aggAllIterator[N]) Current()
[]*measurev1.InternalDataPoint {
if ami.result == nil {
return nil
}
- // For aggregation across all data, shard ID is not applicable
return []*measurev1.InternalDataPoint{{DataPoint: ami.result, ShardId:
0}}
}
func (ami *aggAllIterator[N]) Close() error {
- return ami.prev.Close()
+ return multierr.Combine(ami.err, ami.prev.Close())
}
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index d81d3f682..69fce0659 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -103,15 +103,15 @@ func (as *pushDownAggSchema) Children() []logical.Schema {
}
type unresolvedDistributed struct {
- originalQuery *measurev1.QueryRequest
- groupByEntity bool
- needCompletePushDownAgg bool
+ originalQuery *measurev1.QueryRequest
+ groupByEntity bool
+ pushDownAgg bool
}
-func newUnresolvedDistributed(query *measurev1.QueryRequest,
needCompletePushDownAgg bool) logical.UnresolvedPlan {
+func newUnresolvedDistributed(query *measurev1.QueryRequest, pushDownAgg bool)
logical.UnresolvedPlan {
return &unresolvedDistributed{
- originalQuery: query,
- needCompletePushDownAgg: needCompletePushDownAgg,
+ originalQuery: query,
+ pushDownAgg: pushDownAgg,
}
}
@@ -150,7 +150,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
Limit: limit + ud.originalQuery.Offset,
OrderBy: ud.originalQuery.OrderBy,
}
- if ud.needCompletePushDownAgg {
+ if ud.pushDownAgg {
temp.GroupBy = ud.originalQuery.GroupBy
temp.Agg = ud.originalQuery.Agg
}
@@ -163,7 +163,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
}
// Prepare groupBy tags refs if needed for deduplication
var groupByTagsRefs [][]*logical.TagRef
- if ud.needCompletePushDownAgg && ud.originalQuery.GetGroupBy() != nil {
+ if ud.pushDownAgg && ud.originalQuery.GetGroupBy() != nil {
groupByTags :=
logical.ToTags(ud.originalQuery.GetGroupBy().GetTagProjection())
var err error
groupByTagsRefs, err = s.CreateTagRef(groupByTags...)
@@ -179,12 +179,12 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
return nil, fmt.Errorf("entity tag %s not found", e)
}
result := &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: false,
- sortTagSpec: *sortTagSpec,
- needCompletePushDownAgg: ud.needCompletePushDownAgg,
- groupByTagsRefs: groupByTagsRefs,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: false,
+ sortTagSpec: *sortTagSpec,
+ pushDownAgg: ud.pushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}
if ud.originalQuery.OrderBy != nil &&
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -193,20 +193,20 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
}
if ud.originalQuery.OrderBy == nil {
return &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: true,
- needCompletePushDownAgg: ud.needCompletePushDownAgg,
- groupByTagsRefs: groupByTagsRefs,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: true,
+ pushDownAgg: ud.pushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}, nil
}
if ud.originalQuery.OrderBy.IndexRuleName == "" {
result := &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: true,
- needCompletePushDownAgg: ud.needCompletePushDownAgg,
- groupByTagsRefs: groupByTagsRefs,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: true,
+ pushDownAgg: ud.pushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -225,12 +225,12 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0])
}
result := &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: false,
- sortTagSpec: *sortTagSpec,
- needCompletePushDownAgg: ud.needCompletePushDownAgg,
- groupByTagsRefs: groupByTagsRefs,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: false,
+ sortTagSpec: *sortTagSpec,
+ pushDownAgg: ud.pushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -239,14 +239,14 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
}
type distributedPlan struct {
- s logical.Schema
- queryTemplate *measurev1.QueryRequest
- sortTagSpec logical.TagSpec
- groupByTagsRefs [][]*logical.TagRef
- maxDataPointsSize uint32
- sortByTime bool
- desc bool
- needCompletePushDownAgg bool
+ s logical.Schema
+ queryTemplate *measurev1.QueryRequest
+ sortTagSpec logical.TagSpec
+ groupByTagsRefs [][]*logical.TagRef
+ maxDataPointsSize uint32
+ sortByTime bool
+ desc bool
+ pushDownAgg bool
}
func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator,
err error) {
@@ -272,7 +272,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
}
}()
}
- internalRequest := &measurev1.InternalQueryRequest{Request:
queryRequest}
+ internalRequest := &measurev1.InternalQueryRequest{Request:
queryRequest, AggReturnPartial: t.pushDownAgg}
ff, broadcastErr := dctx.Broadcast(defaultQueryTimeout,
data.TopicInternalMeasureQuery,
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos),
dctx.NodeSelectors(), dctx.TimeRange(), internalRequest))
if broadcastErr != nil {
@@ -292,7 +292,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
if span != nil {
span.AddSubTrace(d.Trace)
}
- if t.needCompletePushDownAgg {
+ if t.pushDownAgg {
pushedDownAggDps =
append(pushedDownAggDps, d.DataPoints...)
dataPointCount += len(d.DataPoints)
continue
@@ -310,7 +310,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
span.Tagf("response_count", "%d", responseCount)
span.Tagf("data_point_count", "%d", dataPointCount)
}
- if t.needCompletePushDownAgg {
+ if t.pushDownAgg {
deduplicatedDps, dedupErr :=
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
if dedupErr != nil {
return nil, multierr.Append(err, dedupErr)
@@ -333,7 +333,7 @@ func (t *distributedPlan) Children() []logical.Plan {
}
func (t *distributedPlan) Schema() logical.Schema {
- if t.needCompletePushDownAgg {
+ if t.pushDownAgg {
return &pushDownAggSchema{
originalSchema: t.s,
aggregationField:
logical.NewField(t.queryTemplate.Agg.FieldName),
@@ -546,7 +546,16 @@ func (s *pushedDownAggregatedIterator) Close() error {
// of the same shard, while preserving results from different shards.
func deduplicateAggregatedDataPointsWithShard(dataPoints
[]*measurev1.InternalDataPoint, groupByTagsRefs [][]*logical.TagRef)
([]*measurev1.InternalDataPoint, error) {
if len(groupByTagsRefs) == 0 {
- return dataPoints, nil
+ // No group-by: deduplicate by shard_id only
+ seen := make(map[uint32]struct{})
+ result := make([]*measurev1.InternalDataPoint, 0,
len(dataPoints))
+ for _, idp := range dataPoints {
+ if _, exists := seen[idp.GetShardId()]; !exists {
+ seen[idp.GetShardId()] = struct{}{}
+ result = append(result, idp)
+ }
+ }
+ return result, nil
}
// key = hash(shard_id, group_key)
// Same shard with same group key will be deduplicated
@@ -558,7 +567,6 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints
[]*measurev1.InternalDa
if keyErr != nil {
return nil, keyErr
}
- // Include shard_id in key calculation
key := hashWithShard(uint64(idp.ShardId), groupKey)
if _, exists := groupMap[key]; !exists {
groupMap[key] = struct{}{}
diff --git a/pkg/query/logical/measure/topn_analyzer.go
b/pkg/query/logical/measure/topn_analyzer.go
index 34aced932..cf2d87b68 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -116,7 +116,9 @@ func TopNAnalyze(criteria *measurev1.TopNRequest,
sourceMeasureSchemaList []*dat
plan = newUnresolvedAggregation(plan,
&logical.Field{Name: topNAggSchema.FieldName},
criteria.GetAgg(),
- true)
+ true,
+ false,
+ false)
}
plan = top(plan, &measurev1.QueryRequest_Top{
diff --git a/test/cases/measure/data/input/group_mean.ql
b/test/cases/measure/data/input/group_mean.ql
new file mode 100644
index 000000000..61e648a32
--- /dev/null
+++ b/test/cases/measure/data/input/group_mean.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, total::field, value::field, MEAN(value) FROM MEASURE
service_cpm_minute IN sw_metric
+TIME > '-15m'
+GROUP BY id, value
\ No newline at end of file
diff --git a/test/cases/measure/data/input/group_mean.yaml
b/test/cases/measure/data/input/group_mean.yaml
new file mode 100644
index 000000000..b35af2e24
--- /dev/null
+++ b/test/cases/measure/data/input/group_mean.yaml
@@ -0,0 +1,34 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id"]
+fieldProjection:
+ names: ["total", "value"]
+groupBy:
+ tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id"]
+ fieldName: "value"
+agg:
+ function: "AGGREGATION_FUNCTION_MEAN"
+ fieldName: "value"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/group_mean.yaml
b/test/cases/measure/data/want/group_mean.yaml
new file mode 100644
index 000000000..82d0e8cdd
--- /dev/null
+++ b/test/cases/measure/data/want/group_mean.yaml
@@ -0,0 +1,54 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: value
+ value:
+ int:
+ value: "2"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc1
+- fields:
+ - name: value
+ value:
+ int:
+ value: "4"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc2
+- fields:
+ - name: value
+ value:
+ int:
+ value: "6"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc3
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 2679c8b33..fadf4ee02 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -52,6 +52,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("group and sum", helpers.Args{Input: "group_sum", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("group and count", helpers.Args{Input: "group_count", Duration:
25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("group and mean", helpers.Args{Input: "group_mean", Duration:
25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("group without field", helpers.Args{Input: "group_no_field",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("top 2 by id", helpers.Args{Input: "top", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
g.Entry("bottom 2 by id", helpers.Args{Input: "bottom", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),