hanahmily commented on code in PR #924:
URL: 
https://github.com/apache/skywalking-banyandb/pull/924#discussion_r2661407345


##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point
+                       continue
+               }
+               // Keep the first data point for each group
+               if _, exists := groupMap[key]; !exists {
+                       groupMap[key] = dp
+               }
+       }
+
+       result := make([]*measurev1.DataPoint, 0, len(groupMap))
+       for _, dp := range groupMap {
+               result = append(result, dp)
+       }
+       return result
+}
+
+// formatGroupByKeyForDedup computes a hash key for a data point based on 
groupBy tags.
+func formatGroupByKeyForDedup(point *measurev1.DataPoint, groupByTagsRefs 
[][]*logical.TagRef) (uint64, error) {
+       hash := xxhash.New()
+       for _, tagFamilyRef := range groupByTagsRefs {
+               for _, tagRef := range tagFamilyRef {
+                       if tagRef.Spec.TagFamilyIdx >= 
len(point.GetTagFamilies()) {
+                               return 0, fmt.Errorf("tag family index out of 
range")
+                       }
+                       if tagRef.Spec.TagIdx >= 
len(point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()) {
+                               return 0, fmt.Errorf("tag index out of range")
+                       }
+                       tag := 
point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()[tagRef.Spec.TagIdx]
+                       switch v := tag.GetValue().GetValue().(type) {
+                       case *modelv1.TagValue_Str:
+                               _, err := hash.Write([]byte(v.Str.GetValue()))
+                               if err != nil {
+                                       return 0, err
+                               }
+                       case *modelv1.TagValue_Int:
+                               _, err := 
hash.Write(convert.Int64ToBytes(v.Int.GetValue()))
+                               if err != nil {
+                                       return 0, err
+                               }
+                       case *modelv1.TagValue_IntArray, 
*modelv1.TagValue_StrArray, *modelv1.TagValue_BinaryData:
+                               return 0, fmt.Errorf("group-by on array/binary 
tag is not supported")

Review Comment:
   Apply this suggestion to "formatGroupByKey"



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point

Review Comment:
   You should return the error instead of skipping it.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)

Review Comment:
   You can use a single loop to build the deduplicated result.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+               if err != nil {
+                       // If we can't compute the key, keep the data point
+                       continue
+               }
+               // Keep the first data point for each group
+               if _, exists := groupMap[key]; !exists {
+                       groupMap[key] = dp
+               }
+       }
+
+       result := make([]*measurev1.DataPoint, 0, len(groupMap))
+       for _, dp := range groupMap {
+               result = append(result, dp)
+       }
+       return result
+}
+
+// formatGroupByKeyForDedup computes a hash key for a data point based on 
groupBy tags.
+func formatGroupByKeyForDedup(point *measurev1.DataPoint, groupByTagsRefs 
[][]*logical.TagRef) (uint64, error) {

Review Comment:
   Please utilize "formatGroupByKey" instead of duplicating it.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since aggregated results from 
different replicas
+// for the same group should be identical, we can safely keep only one per 
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+       if len(groupByTagsRefs) == 0 {
+               // No groupBy, return as is (should not happen for pushed down 
aggregation)
+               return dataPoints
+       }
+
+       groupMap := make(map[uint64]*measurev1.DataPoint)

Review Comment:
   ```suggestion
        groupMap := make(map[uint64]struct{})
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to