Copilot commented on code in PR #932:
URL: 
https://github.com/apache/skywalking-banyandb/pull/932#discussion_r2674986051


##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints 
[]*measurev1.DataPoint, groupByT
        }
        return result, nil
 }
+
+// aggregateCountDataPoints aggregates count results by summing values per 
group.
+func aggregateCountDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef, fieldName string) ([]*measurev1.DataPoint, 
error) {
+       groupMap := make(map[uint64][]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKey(dp, groupByTagsRefs)
+               if err != nil {
+                       return nil, err
+               }
+               groupMap[key] = append(groupMap[key], dp)
+       }
+       result := make([]*measurev1.DataPoint, 0, len(groupMap))
+       for _, dps := range groupMap {
+               if len(dps) == 0 {
+                       continue
+               }
+               // First deduplicate: collect unique count values (for replica 
deduplication)
+               uniqueCounts := make(map[int64]struct{})
+               var firstDp *measurev1.DataPoint
+               for _, dp := range dps {
+                       if firstDp == nil {
+                               firstDp = dp
+                       }
+                       for _, field := range dp.GetFields() {
+                               if field.GetName() == fieldName {
+                                       fieldValue := field.GetValue()
+                                       var countVal int64
+                                       switch v := 
fieldValue.GetValue().(type) {
+                                       case *modelv1.FieldValue_Int:
+                                               countVal = v.Int.GetValue()
+                                       case *modelv1.FieldValue_Float:
+                                               countVal = 
int64(v.Float.GetValue())
+                                       default:
+                                               continue
+                                       }
+                                       uniqueCounts[countVal] = struct{}{}
+                                       break
+                               }
+                       }
+               }
+               // Then sum all unique count values
+               var sumValue int64
+               for countVal := range uniqueCounts {
+                       sumValue += countVal
+               }

Review Comment:
   The deduplication logic using a set of unique count values 
(map[int64]struct{}) is flawed when multiple shards happen to have the same 
count value for different data. For example:
   
   - Shard 1, Replica A: count = 5 (for 5 unique items in partition 1)
   - Shard 1, Replica B: count = 5 (replica of partition 1)
   - Shard 2, Replica A: count = 5 (for 5 unique items in partition 2, 
different data)
   - Shard 2, Replica B: count = 5 (replica of partition 2)
   
   The current logic would collect uniqueCounts = {5} and sum to 5, when the 
correct answer should be 10 (5 from shard 1 + 5 from shard 2).
   
   For correct replica deduplication with shard aggregation, the logic needs to 
identify which data points come from replicas of the same shard (same data) 
versus different shards (different data). The current approach of using count 
values for deduplication is insufficient because it can't distinguish between 
identical counts from replicas versus coincidentally equal counts from 
different shards.



##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -161,7 +161,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
        needCompletePushDownAgg := criteria.GetAgg() != nil &&
                (criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
                        criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
-                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM) &&
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM ||
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT) &&
                criteria.GetTop() == nil

Review Comment:
   Missing parentheses around the compound OR expression causes incorrect 
operator precedence. The `&&` operator has higher precedence than `||`, so the 
current code is evaluated as:
   
   `needCompletePushDownAgg := criteria.GetAgg() != nil && (... || ... || SUM 
|| (COUNT && criteria.GetTop() == nil))`
   
   This means COUNT aggregation would only be considered for push-down when 
`criteria.GetTop() == nil`, while other aggregation functions (MAX, MIN, SUM) 
would be considered regardless of the Top condition. The intended logic should 
apply the `criteria.GetTop() == nil` condition to all aggregation functions 
uniformly.
   
   Add parentheses to ensure the entire OR expression is evaluated before the 
`&& criteria.GetTop() == nil` condition is applied.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints 
[]*measurev1.DataPoint, groupByT
        }
        return result, nil
 }
+
+// aggregateCountDataPoints aggregates count results by summing values per 
group.
+func aggregateCountDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef, fieldName string) ([]*measurev1.DataPoint, 
error) {
+       groupMap := make(map[uint64][]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKey(dp, groupByTagsRefs)
+               if err != nil {
+                       return nil, err
+               }
+               groupMap[key] = append(groupMap[key], dp)
+       }

Review Comment:
   The function aggregateCountDataPoints doesn't check if groupByTagsRefs is 
empty before calling formatGroupByKey. When groupByTagsRefs is empty (no GROUP 
BY clause), formatGroupByKey will return the same hash for all data points, 
causing all counts to be grouped together. This would work, but is inconsistent 
with the deduplicateAggregatedDataPoints function which explicitly checks for 
empty groupByTagsRefs at line 552 and returns early.
   
   For consistency and to avoid unnecessary processing, add a check for empty 
groupByTagsRefs at the beginning of aggregateCountDataPoints similar to the one 
in deduplicateAggregatedDataPoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to