hanahmily commented on code in PR #932:
URL:
https://github.com/apache/skywalking-banyandb/pull/932#discussion_r2680717655
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -535,9 +535,7 @@ func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
-// by keeping only one data point per group. Since replicas hold identical
data, aggregates
-// for the same group are identical across replicas.
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas.
func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
Review Comment:
The function `deduplicateAggregatedDataPoints` has already deduplicated the
data, but the deduplication logic is rough.
It only deduplicates based on the group-by key. While it successfully
removes duplicates across replicas of the same shard, it needs to retain the
same group key when it comes from different shards.
##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -180,12 +181,18 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
}
if criteria.GetAgg() != nil {
- plan = newUnresolvedAggregation(plan,
- logical.NewField(criteria.GetAgg().GetFieldName()),
- criteria.GetAgg().GetFunction(),
- criteria.GetGroupBy() != nil,
- )
- pushedLimit = math.MaxInt
+ // If needCompletePushDownAgg is true and has GROUP BY, skip
aggregation plan
Review Comment:
According to my comment on deduplicateAggregatedDataPoints, the current
logic should remain unchanged.
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -535,9 +535,7 @@ func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
-// by keeping only one data point per group. Since replicas hold identical
data, aggregates
-// for the same group are identical across replicas.
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas.
func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
Review Comment:
You would create some new protobuf messages and rpc
```proto
message InternalDataPoint {
DataPoint data_point = 1;
uint64 shard_id = 2;
}
message InternalQueryResponse {
repeated InternalDataPoint data_points = 1;
common.v1.Trace trace = 2;
}
message InternalQueryRequest {
QueryRequest query_request = 1;
}
rpc InternalQuery(InternalQueryResponse) returns (InternalQueryResponse)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]