Copilot commented on code in PR #932:
URL:
https://github.com/apache/skywalking-banyandb/pull/932#discussion_r2674986051
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints
[]*measurev1.DataPoint, groupByT
}
return result, nil
}
+
+// aggregateCountDataPoints aggregates count results by summing values per
group.
+func aggregateCountDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef, fieldName string) ([]*measurev1.DataPoint,
error) {
+ groupMap := make(map[uint64][]*measurev1.DataPoint)
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKey(dp, groupByTagsRefs)
+ if err != nil {
+ return nil, err
+ }
+ groupMap[key] = append(groupMap[key], dp)
+ }
+ result := make([]*measurev1.DataPoint, 0, len(groupMap))
+ for _, dps := range groupMap {
+ if len(dps) == 0 {
+ continue
+ }
+ // First deduplicate: collect unique count values (for replica
deduplication)
+ uniqueCounts := make(map[int64]struct{})
+ var firstDp *measurev1.DataPoint
+ for _, dp := range dps {
+ if firstDp == nil {
+ firstDp = dp
+ }
+ for _, field := range dp.GetFields() {
+ if field.GetName() == fieldName {
+ fieldValue := field.GetValue()
+ var countVal int64
+ switch v :=
fieldValue.GetValue().(type) {
+ case *modelv1.FieldValue_Int:
+ countVal = v.Int.GetValue()
+ case *modelv1.FieldValue_Float:
+ countVal =
int64(v.Float.GetValue())
+ default:
+ continue
+ }
+ uniqueCounts[countVal] = struct{}{}
+ break
+ }
+ }
+ }
+ // Then sum all unique count values
+ var sumValue int64
+ for countVal := range uniqueCounts {
+ sumValue += countVal
+ }
Review Comment:
The deduplication logic using a set of unique count values
(map[int64]struct{}) is flawed when multiple shards happen to have the same
count value for different data. For example:
- Shard 1, Replica A: count = 5 (for 5 unique items in partition 1)
- Shard 1, Replica B: count = 5 (replica of partition 1)
- Shard 2, Replica A: count = 5 (for 5 unique items in partition 2,
different data)
- Shard 2, Replica B: count = 5 (replica of partition 2)
The current logic would collect uniqueCounts = {5} and sum to 5, when the
correct answer should be 10 (5 from shard 1 + 5 from shard 2).
For correct replica deduplication with shard aggregation, the logic needs to
identify which data points come from replicas of the same shard (same data)
versus different shards (different data). The current approach of using count
values for deduplication is insufficient because it can't distinguish between
identical counts from replicas versus coincidentally equal counts from
different shards.
##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -161,7 +161,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
needCompletePushDownAgg := criteria.GetAgg() != nil &&
(criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
- criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM) &&
+ criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM ||
+ criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT) &&
criteria.GetTop() == nil
Review Comment:
Missing parentheses around the compound OR expression causes incorrect
operator precedence. The `&&` operator has higher precedence than `||`, so the
current code is evaluated as:
`needCompletePushDownAgg := criteria.GetAgg() != nil && (... || ... || SUM
|| (COUNT && criteria.GetTop() == nil))`
This means COUNT aggregation would only be considered for push-down when
`criteria.GetTop() == nil`, while other aggregation functions (MAX, MIN, SUM)
would be considered regardless of the Top condition. The intended logic should
apply the `criteria.GetTop() == nil` condition to all aggregation functions
uniformly.
Add parentheses to ensure the entire OR expression is evaluated before the
`&& criteria.GetTop() == nil` condition is applied.
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints
[]*measurev1.DataPoint, groupByT
}
return result, nil
}
+
+// aggregateCountDataPoints aggregates count results by summing values per
group.
+func aggregateCountDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef, fieldName string) ([]*measurev1.DataPoint,
error) {
+ groupMap := make(map[uint64][]*measurev1.DataPoint)
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKey(dp, groupByTagsRefs)
+ if err != nil {
+ return nil, err
+ }
+ groupMap[key] = append(groupMap[key], dp)
+ }
Review Comment:
The function aggregateCountDataPoints doesn't check if groupByTagsRefs is
empty before calling formatGroupByKey. When groupByTagsRefs is empty (no GROUP
BY clause), formatGroupByKey will return the same hash for all data points,
causing all counts to be grouped together. This would work, but is inconsistent
with the deduplicateAggregatedDataPoints function which explicitly checks for
empty groupByTagsRefs at line 552 and returns early.
For consistency and to avoid unnecessary processing, add a check for empty
groupByTagsRefs at the beginning of aggregateCountDataPoints similar to the one
in deduplicateAggregatedDataPoints.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]