Copilot commented on code in PR #924:
URL: 
https://github.com/apache/skywalking-banyandb/pull/924#discussion_r2661171843


##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point
+                       continue
+               }
+               // Keep the first data point for each group
+               if _, exists := groupMap[key]; !exists {
+                       groupMap[key] = dp
+               }
+       }
+
+       result := make([]*measurev1.DataPoint, 0, len(groupMap))
+       for _, dp := range groupMap {
+               result = append(result, dp)
+       }
+       return result
+}
+
+// formatGroupByKeyForDedup computes a hash key for a data point based on 
groupBy tags.
+func formatGroupByKeyForDedup(point *measurev1.DataPoint, groupByTagsRefs 
[][]*logical.TagRef) (uint64, error) {
+       hash := xxhash.New()
+       for _, tagFamilyRef := range groupByTagsRefs {
+               for _, tagRef := range tagFamilyRef {
+                       if tagRef.Spec.TagFamilyIdx >= 
len(point.GetTagFamilies()) {
+                               return 0, fmt.Errorf("tag family index out of 
range")
+                       }
+                       if tagRef.Spec.TagIdx >= 
len(point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()) {
+                               return 0, fmt.Errorf("tag index out of range")
+                       }
+                       tag := 
point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()[tagRef.Spec.TagIdx]
+                       switch v := tag.GetValue().GetValue().(type) {
+                       case *modelv1.TagValue_Str:
+                               _, err := hash.Write([]byte(v.Str.GetValue()))
+                               if err != nil {
+                                       return 0, err
+                               }
+                       case *modelv1.TagValue_Int:
+                               _, err := 
hash.Write(convert.Int64ToBytes(v.Int.GetValue()))
+                               if err != nil {
+                                       return 0, err
+                               }
+                       case *modelv1.TagValue_IntArray, 
*modelv1.TagValue_StrArray, *modelv1.TagValue_BinaryData:
+                               return 0, fmt.Errorf("group-by on array/binary 
tag is not supported")

Review Comment:
   The switch statement doesn't have a default case to handle unexpected tag 
value types. If a new tag value type is added to the modelv1.TagValue enum in 
the future, it will be silently ignored (returning 0 hash and nil error), which 
could lead to incorrect deduplication behavior.
   
   Add a default case that returns an error for unknown tag value types to make 
the code more maintainable and catch potential issues early.
   ```suggestion
                                return 0, fmt.Errorf("group-by on array/binary 
tag is not supported")
                        default:
                                return 0, fmt.Errorf("unsupported tag value 
type %T in group-by", v)
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.

Review Comment:
   The comment states "Since aggregated results from different replicas for the 
same group should be identical, we can safely keep only one per group." This 
assumption is correct for MAX and MIN aggregations, but it's worth clarifying 
in the comment that this applies to SUM as well. In a properly replicated 
database system, all replicas contain identical data, so SUM(data) from replica 
A should equal SUM(data) from replica B. The deduplication is removing 
duplicate computation results, not combining partial sums. Consider updating 
the comment to make this clearer to avoid confusion.
   ```suggestion
   // by keeping only one data point per group. In a properly replicated 
system, each replica
   // holds identical data, so any aggregate (including SUM, MIN, MAX, etc.) 
for the same group
   // is expected to be identical across replicas. This function therefore 
drops duplicate
   // full aggregation results from different replicas, rather than combining 
partial aggregates.
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point
+                       continue
+               }
+               // Keep the first data point for each group
+               if _, exists := groupMap[key]; !exists {
+                       groupMap[key] = dp
+               }
+       }
+
+       result := make([]*measurev1.DataPoint, 0, len(groupMap))
+       for _, dp := range groupMap {
+               result = append(result, dp)

Review Comment:
   The result order from deduplication is nondeterministic because map 
iteration order in Go is randomized. This means the same query could return 
results in different orders on different executions, even with the same input 
data. This could cause issues for clients that rely on consistent ordering or 
for testing.
   
   Consider either: (1) sorting the results before returning them, or (2) 
documenting that deduplication does not preserve order. If consistent ordering 
is required by the system, sorting should be added.
   ```suggestion
        keys := make([]uint64, 0, len(dataPoints))
        for _, dp := range dataPoints {
                key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
                if err != nil {
                        // If we can't compute the key, keep the data point
                        continue
                }
                // Keep the first data point for each group and record 
insertion order
                if _, exists := groupMap[key]; !exists {
                        groupMap[key] = dp
                        keys = append(keys, key)
                }
        }
   
        result := make([]*measurev1.DataPoint, 0, len(keys))
        for _, key := range keys {
                if dp, ok := groupMap[key]; ok {
                        result = append(result, dp)
                }
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point

Review Comment:
   Silently skipping data points with hash computation errors can lead to 
incorrect aggregation results and data loss. When an error occurs (such as tag 
family/tag index out of range or unsupported tag types), the data point is 
dropped without any logging or error propagation. This makes debugging 
difficult and could produce incorrect query results without the user being 
aware.
   
   Consider either: (1) logging the error at warning level before continuing, 
or (2) returning an error from the deduplicateAggregatedDataPoints function and 
propagating it to the caller. The second option is preferable as it makes 
failures explicit.
   ```suggestion
        // Use a dedicated logger for visibility into deduplication issues.
        log := logger.GetLogger("query").Named("measure-dedup").Sugar()
   
        groupMap := make(map[uint64]*measurev1.DataPoint)
        for _, dp := range dataPoints {
                key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
                if err != nil {
                        // If we can't compute the key, keep the data point but 
log the error
                        log.Warnf("failed to compute group-by key for 
aggregated data point; keeping data point without deduplication: %v", err)
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point
+                       continue
+               }
+               // Keep the first data point for each group
+               if _, exists := groupMap[key]; !exists {
+                       groupMap[key] = dp
+               }

Review Comment:
   The deduplication logic assumes that hash collisions are impossible or 
acceptable. When two different groups produce the same hash value (uint64), 
only one data point will be kept (line 553-555). While xxhash has low collision 
probability, hash collisions are still possible with a 64-bit hash, especially 
with large datasets.
   
   Consider adding a secondary check to verify that deduplicated data points 
actually have matching group-by tag values, not just matching hashes. If the 
tags don't match, this indicates a hash collision and both data points should 
be kept.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to