Copilot commented on code in PR #924:
URL:
https://github.com/apache/skywalking-banyandb/pull/924#discussion_r2661171843
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.DataPoint {
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
+// by keeping only one data point per group. Since aggregated results from
different replicas
+// for the same group should be identical, we can safely keep only one per
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+ if len(groupByTagsRefs) == 0 {
+ // No groupBy, return as is (should not happen for pushed down
aggregation)
+ return dataPoints
+ }
+
+ groupMap := make(map[uint64]*measurev1.DataPoint)
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+ if err != nil {
+ // If we can't compute the key, keep the data point
+ continue
+ }
+ // Keep the first data point for each group
+ if _, exists := groupMap[key]; !exists {
+ groupMap[key] = dp
+ }
+ }
+
+ result := make([]*measurev1.DataPoint, 0, len(groupMap))
+ for _, dp := range groupMap {
+ result = append(result, dp)
+ }
+ return result
+}
+
+// formatGroupByKeyForDedup computes a hash key for a data point based on
groupBy tags.
+func formatGroupByKeyForDedup(point *measurev1.DataPoint, groupByTagsRefs
[][]*logical.TagRef) (uint64, error) {
+ hash := xxhash.New()
+ for _, tagFamilyRef := range groupByTagsRefs {
+ for _, tagRef := range tagFamilyRef {
+ if tagRef.Spec.TagFamilyIdx >=
len(point.GetTagFamilies()) {
+ return 0, fmt.Errorf("tag family index out of
range")
+ }
+ if tagRef.Spec.TagIdx >=
len(point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()) {
+ return 0, fmt.Errorf("tag index out of range")
+ }
+ tag :=
point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()[tagRef.Spec.TagIdx]
+ switch v := tag.GetValue().GetValue().(type) {
+ case *modelv1.TagValue_Str:
+ _, err := hash.Write([]byte(v.Str.GetValue()))
+ if err != nil {
+ return 0, err
+ }
+ case *modelv1.TagValue_Int:
+ _, err :=
hash.Write(convert.Int64ToBytes(v.Int.GetValue()))
+ if err != nil {
+ return 0, err
+ }
+ case *modelv1.TagValue_IntArray,
*modelv1.TagValue_StrArray, *modelv1.TagValue_BinaryData:
+ return 0, fmt.Errorf("group-by on array/binary
tag is not supported")
Review Comment:
The switch statement doesn't have a default case to handle unexpected tag
value types. If a new tag value type is added to the modelv1.TagValue enum in
the future, it will be silently ignored (returning 0 hash and nil error), which
could lead to incorrect deduplication behavior.
Add a default case that returns an error for unknown tag value types to make
the code more maintainable and catch potential issues early.
```suggestion
return 0, fmt.Errorf("group-by on array/binary
tag is not supported")
default:
return 0, fmt.Errorf("unsupported tag value
type %T in group-by", v)
```
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.DataPoint {
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
+// by keeping only one data point per group. Since aggregated results from
different replicas
+// for the same group should be identical, we can safely keep only one per
group.
Review Comment:
The comment states "Since aggregated results from different replicas for the
same group should be identical, we can safely keep only one per group." This
assumption is correct for MAX and MIN aggregations, but it's worth clarifying
in the comment that this applies to SUM as well. In a properly replicated
database system, all replicas contain identical data, so SUM(data) from replica
A should equal SUM(data) from replica B. The deduplication is removing
duplicate computation results, not combining partial sums. Consider updating
the comment to make this clearer to avoid confusion.
```suggestion
// by keeping only one data point per group. In a properly replicated
system, each replica
// holds identical data, so any aggregate (including SUM, MIN, MAX, etc.)
for the same group
// is expected to be identical across replicas. This function therefore
drops duplicate
// full aggregation results from different replicas, rather than combining
partial aggregates.
```
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.DataPoint {
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
+// by keeping only one data point per group. Since aggregated results from
different replicas
+// for the same group should be identical, we can safely keep only one per
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+ if len(groupByTagsRefs) == 0 {
+ // No groupBy, return as is (should not happen for pushed down
aggregation)
+ return dataPoints
+ }
+
+ groupMap := make(map[uint64]*measurev1.DataPoint)
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+ if err != nil {
+ // If we can't compute the key, keep the data point
+ continue
+ }
+ // Keep the first data point for each group
+ if _, exists := groupMap[key]; !exists {
+ groupMap[key] = dp
+ }
+ }
+
+ result := make([]*measurev1.DataPoint, 0, len(groupMap))
+ for _, dp := range groupMap {
+ result = append(result, dp)
Review Comment:
The result order from deduplication is nondeterministic because map
iteration order in Go is randomized. This means the same query could return
results in different orders on different executions, even with the same input
data. This could cause issues for clients that rely on consistent ordering or
for testing.
Consider either: (1) sorting the results before returning them, or (2)
documenting that deduplication does not preserve order. If consistent ordering
is required by the system, sorting should be added.
```suggestion
keys := make([]uint64, 0, len(dataPoints))
for _, dp := range dataPoints {
key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
if err != nil {
// If we can't compute the key, keep the data point
continue
}
// Keep the first data point for each group and record
insertion order
if _, exists := groupMap[key]; !exists {
groupMap[key] = dp
keys = append(keys, key)
}
}
result := make([]*measurev1.DataPoint, 0, len(keys))
for _, key := range keys {
if dp, ok := groupMap[key]; ok {
result = append(result, dp)
}
```
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.DataPoint {
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
+// by keeping only one data point per group. Since aggregated results from
different replicas
+// for the same group should be identical, we can safely keep only one per
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+ if len(groupByTagsRefs) == 0 {
+ // No groupBy, return as is (should not happen for pushed down
aggregation)
+ return dataPoints
+ }
+
+ groupMap := make(map[uint64]*measurev1.DataPoint)
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+ if err != nil {
+ // If we can't compute the key, keep the data point
Review Comment:
Silently skipping data points with hash computation errors can lead to
incorrect aggregation results and data loss. When an error occurs (such as tag
family/tag index out of range or unsupported tag types), the data point is
dropped without any logging or error propagation. This makes debugging
difficult and could produce incorrect query results without the user being
aware.
Consider either: (1) logging the error at warning level before continuing,
or (2) returning an error from the deduplicateAggregatedDataPoints function and
propagating it to the caller. The second option is preferable as it makes
failures explicit.
```suggestion
// Use a dedicated logger for visibility into deduplication issues.
log := logger.GetLogger("query").Named("measure-dedup").Sugar()
groupMap := make(map[uint64]*measurev1.DataPoint)
for _, dp := range dataPoints {
key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
if err != nil {
// If we can't compute the key, keep the data point but
log the error
log.Warnf("failed to compute group-by key for
aggregated data point; keeping data point without deduplication: %v", err)
```
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -514,3 +532,63 @@ func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.DataPoint {
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
+// by keeping only one data point per group. Since aggregated results from
different replicas
+// for the same group should be identical, we can safely keep only one per
group.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) []*measurev1.DataPoint {
+ if len(groupByTagsRefs) == 0 {
+ // No groupBy, return as is (should not happen for pushed down
aggregation)
+ return dataPoints
+ }
+
+ groupMap := make(map[uint64]*measurev1.DataPoint)
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKeyForDedup(dp, groupByTagsRefs)
+ if err != nil {
+ // If we can't compute the key, keep the data point
+ continue
+ }
+ // Keep the first data point for each group
+ if _, exists := groupMap[key]; !exists {
+ groupMap[key] = dp
+ }
Review Comment:
The deduplication logic assumes that hash collisions are impossible or
acceptable. When two different groups produce the same hash value (uint64),
only one data point will be kept (line 553-555). While xxhash has low collision
probability, hash collisions are still possible with a 64-bit hash, especially
with large datasets.
Consider adding a secondary check to verify that deduplicated data points
actually have matching group-by tag values, not just matching hashes. If the
tags don't match, this indicates a hash collision and both data points should
be kept.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]