This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ff8bbe9a refactor deduplicateAggregatedDataPoints (#943)
ff8bbe9a is described below

commit ff8bbe9af2387f75f64d7ee9a9674305a1404ad7
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Jan 20 21:15:49 2026 +0800

    refactor deduplicateAggregatedDataPoints (#943)
    
    
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 api/data/data.go                                   |   7 +
 api/data/measure.go                                |  10 +
 api/proto/banyandb/measure/v1/query.proto          |  25 ++
 api/proto/banyandb/measure/v1/rpc.proto            |   4 +
 banyand/dquery/measure.go                          |   2 +-
 banyand/internal/storage/segment.go                |  13 +
 banyand/internal/storage/storage.go                |   1 +
 banyand/measure/block.go                           |   5 +
 banyand/measure/cache_benchmark_test.go            |   2 +-
 banyand/measure/part.go                            |   1 +
 banyand/measure/query.go                           |  17 +-
 banyand/measure/query_test.go                      |   7 +-
 banyand/query/processor.go                         | 341 ++++++++++++++++-----
 banyand/query/processor_topn.go                    |   2 +-
 banyand/query/query.go                             |   7 +
 docs/api-reference.md                              |  54 ++++
 pkg/query/executor/interface.go                    |   2 +-
 pkg/query/logical/measure/measure_plan.go          |   2 +-
 .../logical/measure/measure_plan_aggregation.go    |  17 +-
 .../logical/measure/measure_plan_distributed.go    | 137 +++++----
 .../measure/measure_plan_distributed_test.go       | 184 +++++++++--
 pkg/query/logical/measure/measure_plan_groupby.go  |  32 +-
 .../measure/measure_plan_indexscan_local.go        |  16 +-
 pkg/query/logical/measure/measure_plan_top.go      |  10 +-
 pkg/query/logical/measure/measure_top.go           |   6 +-
 pkg/query/logical/measure/topn_plan_localscan.go   |  10 +-
 pkg/query/logical/measure/topn_plan_merge.go       |   6 +-
 pkg/query/model/model.go                           |   1 +
 28 files changed, 713 insertions(+), 208 deletions(-)

diff --git a/api/data/data.go b/api/data/data.go
index 956c9e33..61c97897 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -35,6 +35,7 @@ var (
                TopicStreamQuery.String():              TopicStreamQuery,
                TopicMeasureWrite.String():             TopicMeasureWrite,
                TopicMeasureQuery.String():             TopicMeasureQuery,
+               TopicInternalMeasureQuery.String():     
TopicInternalMeasureQuery,
                TopicTopNQuery.String():                TopicTopNQuery,
                TopicPropertyDelete.String():           TopicPropertyDelete,
                TopicPropertyQuery.String():            TopicPropertyQuery,
@@ -71,6 +72,9 @@ var (
                TopicMeasureQuery: func() proto.Message {
                        return &measurev1.QueryRequest{}
                },
+               TopicInternalMeasureQuery: func() proto.Message {
+                       return &measurev1.InternalQueryRequest{}
+               },
                TopicTopNQuery: func() proto.Message {
                        return &measurev1.TopNRequest{}
                },
@@ -139,6 +143,9 @@ var (
                TopicMeasureQuery: func() proto.Message {
                        return &measurev1.QueryResponse{}
                },
+               TopicInternalMeasureQuery: func() proto.Message {
+                       return &measurev1.InternalQueryResponse{}
+               },
                TopicTopNQuery: func() proto.Message {
                        return &measurev1.TopNResponse{}
                },
diff --git a/api/data/measure.go b/api/data/measure.go
index ea9eb8de..2bf2bc4d 100644
--- a/api/data/measure.go
+++ b/api/data/measure.go
@@ -40,6 +40,16 @@ var MeasureQueryKindVersion = common.KindVersion{
 // TopicMeasureQuery is the measure query topic.
 var TopicMeasureQuery = bus.BiTopic(MeasureQueryKindVersion.String())
 
+// InternalMeasureQueryKindVersion is the version tag of internal measure 
query kind.
+var InternalMeasureQueryKindVersion = common.KindVersion{
+       Version: "v1",
+       Kind:    "internal-measure-query",
+}
+
+// TopicInternalMeasureQuery is the internal measure query topic.
+// Used for distributed query with shard information.
+var TopicInternalMeasureQuery = 
bus.BiTopic(InternalMeasureQueryKindVersion.String())
+
 // TopNQueryKindVersion is the version tag of top-n query kind.
 var TopNQueryKindVersion = common.KindVersion{
        Version: "v1",
diff --git a/api/proto/banyandb/measure/v1/query.proto 
b/api/proto/banyandb/measure/v1/query.proto
index 9bd0f045..a3d10adb 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -55,6 +55,31 @@ message QueryResponse {
   common.v1.Trace trace = 2;
 }
 
+// InternalDataPoint wraps DataPoint with shard information for internal use.
+// Used in distributed query to distinguish data from different shards.
+message InternalDataPoint {
+  // The actual data point
+  DataPoint data_point = 1;
+  // The shard id where this data point comes from
+  uint32 shard_id = 2;
+}
+
+// InternalQueryRequest is the internal request for distributed query.
+// Wraps QueryRequest for extensibility.
+message InternalQueryRequest {
+  // The actual query request
+  QueryRequest request = 1;
+}
+
+// InternalQueryResponse is the internal response for distributed query.
+// Contains shard information for proper deduplication.
+message InternalQueryResponse {
+  // data_points with shard information
+  repeated InternalDataPoint data_points = 1;
+  // trace contains the trace information of the query when trace is enabled
+  common.v1.Trace trace = 2;
+}
+
 // QueryRequest is the request contract for query.
 message QueryRequest {
   // groups indicate where the data points are stored.
diff --git a/api/proto/banyandb/measure/v1/rpc.proto 
b/api/proto/banyandb/measure/v1/rpc.proto
index 0f1d1bf5..5f547384 100644
--- a/api/proto/banyandb/measure/v1/rpc.proto
+++ b/api/proto/banyandb/measure/v1/rpc.proto
@@ -46,6 +46,10 @@ service MeasureService {
     };
   }
 
+  // InternalQuery is used for internal distributed query between liaison and 
data nodes.
+  // Returns InternalQueryResponse with shard information for proper 
deduplication.
+  rpc InternalQuery(InternalQueryRequest) returns (InternalQueryResponse);
+
   rpc Write(stream WriteRequest) returns (stream WriteResponse);
   rpc TopN(TopNRequest) returns (TopNResponse) {
     option (google.api.http) = {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index cbe8fe3e..3bfaaea4 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -160,7 +160,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
                        r++
                        current := mIterator.Current()
                        if len(current) > 0 {
-                               result = append(result, current[0])
+                               result = append(result, 
current[0].GetDataPoint())
                        }
                }
        }()
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 08408aca..74dff151 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -141,6 +141,19 @@ func (s *segment[T, O]) Tables() (tt []T, cc []Cache) {
        return tt, cc
 }
 
+// TablesWithShardIDs returns tables with their corresponding shard IDs.
+func (s *segment[T, O]) TablesWithShardIDs() (tt []T, shardIDs 
[]common.ShardID, cc []Cache) {
+       sLst := s.sLst.Load()
+       if sLst != nil {
+               for _, sh := range *sLst {
+                       tt = append(tt, sh.table)
+                       shardIDs = append(shardIDs, sh.id)
+                       cc = append(cc, sh.shardCache)
+               }
+       }
+       return tt, shardIDs, cc
+}
+
 func (s *segment[T, O]) incRef(ctx context.Context) error {
        s.lastAccessed.Store(time.Now().UnixNano())
        if atomic.LoadInt32(&s.refCount) <= 0 {
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index e4cb4d5b..5ac1b883 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -128,6 +128,7 @@ type Segment[T TSTable, O any] interface {
        GetTimeRange() timestamp.TimeRange
        CreateTSTableIfNotExist(shardID common.ShardID) (T, error)
        Tables() ([]T, []Cache)
+       TablesWithShardIDs() ([]T, []common.ShardID, []Cache)
        Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, 
error)
        IndexDB() IndexDB
 }
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 24c9cd48..6cffb728 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -445,6 +445,7 @@ type blockCursor struct {
        idx                 int
        minTimestamp        int64
        maxTimestamp        int64
+       shardID             common.ShardID
 }
 
 func (bc *blockCursor) reset() {
@@ -497,6 +498,9 @@ func (bc *blockCursor) copyAllTo(r *model.MeasureResult, 
storedIndexValue map[co
        r.SID = bc.bm.seriesID
        r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...)
        r.Versions = append(r.Versions, bc.versions[idx:offset]...)
+       for i := 0; i < size; i++ {
+               r.ShardIDs = append(r.ShardIDs, bc.shardID)
+       }
        if desc {
                slices.Reverse(r.Timestamps)
                slices.Reverse(r.Versions)
@@ -595,6 +599,7 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, 
storedIndexValue map[commo
        r.SID = bc.bm.seriesID
        r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx])
        r.Versions = append(r.Versions, bc.versions[bc.idx])
+       r.ShardIDs = append(r.ShardIDs, bc.shardID)
        var indexValue map[string]*modelv1.TagValue
        if storedIndexValue != nil {
                indexValue = storedIndexValue[r.SID]
diff --git a/banyand/measure/cache_benchmark_test.go 
b/banyand/measure/cache_benchmark_test.go
index 12c2f368..b8052cfa 100644
--- a/banyand/measure/cache_benchmark_test.go
+++ b/banyand/measure/cache_benchmark_test.go
@@ -330,7 +330,7 @@ func (m *measure) QueryWithCache(ctx context.Context, mqo 
model.MeasureQueryOpti
                }
        }
 
-       sids, tables, _, storedIndexValue, newTagProjection, err := 
m.searchSeriesList(ctx, series, mqo, segments)
+       sids, tables, _, _, storedIndexValue, newTagProjection, err := 
m.searchSeriesList(ctx, series, mqo, segments)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 7cb29232..844713fb 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -66,6 +66,7 @@ type part struct {
        path                 string
        primaryBlockMetadata []primaryBlockMetadata
        partMetadata         partMetadata
+       shardID              common.ShardID
 }
 
 func (p *part) close() {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 22fa8aa4..495e0304 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -120,7 +120,7 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                }
        }
 
-       sids, tables, caches, storedIndexValue, newTagProjection, err := 
m.searchSeriesList(ctx, series, mqo, segments)
+       sids, tables, tableShardIDs, caches, storedIndexValue, 
newTagProjection, err := m.searchSeriesList(ctx, series, mqo, segments)
        if err != nil {
                return nil, err
        }
@@ -166,11 +166,16 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                if s == nil {
                        continue
                }
+               oldLen := len(parts)
                parts, n = s.getParts(parts, caches[i], qo.minTimestamp, 
qo.maxTimestamp)
                if n < 1 {
                        s.decRef()
                        continue
                }
+               // Set shard ID for newly added parts
+               for j := oldLen; j < len(parts); j++ {
+                       parts[j].shardID = tableShardIDs[i]
+               }
                result.snapshots = append(result.snapshots, s)
        }
 
@@ -212,7 +217,7 @@ type tagNameWithType struct {
 
 func (m *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, 
mqo model.MeasureQueryOptions,
        segments []storage.Segment[*tsTable, option],
-) (sl []common.SeriesID, tables []*tsTable, caches []storage.Cache, 
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) (sl []common.SeriesID, tables []*tsTable, tableShardIDs []common.ShardID, 
caches []storage.Cache, storedIndexValue 
map[common.SeriesID]map[string]*modelv1.TagValue,
        newTagProjection []model.TagProjection, err error,
 ) {
        var indexProjection []index.FieldKey
@@ -266,11 +271,12 @@ func (m *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                        Projection:  indexProjection,
                })
                if err != nil {
-                       return nil, nil, nil, nil, nil, err
+                       return nil, nil, nil, nil, nil, nil, err
                }
                if len(sd.SeriesList) > 0 {
-                       tt, cc := segments[i].Tables()
+                       tt, shardIDs, cc := segments[i].TablesWithShardIDs()
                        tables = append(tables, tt...)
+                       tableShardIDs = append(tableShardIDs, shardIDs...)
                        caches = append(caches, cc...)
 
                        // Create segResult for this segment
@@ -360,7 +366,7 @@ func (m *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                }
        }
 
-       return sl, tables, caches, storedIndexValue, newTagProjection, nil
+       return sl, tables, tableShardIDs, caches, storedIndexValue, 
newTagProjection, nil
 }
 
 func (m *measure) buildStoredIndexValue(
@@ -515,6 +521,7 @@ func (m *measure) searchBlocks(ctx context.Context, result 
*queryResult, sids []
                bc := generateBlockCursor()
                p := tstIter.piHeap[0]
                bc.init(p.p, p.curBlock, qo)
+               bc.shardID = p.p.shardID
                result.data = append(result.data, bc)
                totalBlockBytes += bc.bm.uncompressedSizeBytes
                if quota >= 0 && totalBlockBytes > uint64(quota) {
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 18851e8d..66ab6077 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -25,6 +25,7 @@ import (
        "time"
 
        "github.com/google/go-cmp/cmp"
+       "github.com/google/go-cmp/cmp/cmpopts"
        "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/testing/protocmp"
 
@@ -1295,7 +1296,8 @@ func TestQueryResult(t *testing.T) {
                                }
 
                                if diff := cmp.Diff(got, tt.want,
-                                       protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
+                                       protocmp.IgnoreUnknown(), 
protocmp.Transform(),
+                                       
cmpopts.IgnoreFields(model.MeasureResult{}, "ShardIDs")); diff != "" {
                                        t.Errorf("Unexpected []pbv1.Result 
(-got +want):\n%s", diff)
                                }
                        }
@@ -1512,7 +1514,8 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
                                got = append(got, *r)
                        }
                        if diff := cmp.Diff(got, tt.want,
-                               protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
+                               protocmp.IgnoreUnknown(), protocmp.Transform(),
+                               cmpopts.IgnoreFields(model.MeasureResult{}, 
"ShardIDs")); diff != "" {
                                t.Errorf("Unexpected []pbv1.Result (-got 
+want):\n%s", diff)
                        }
                })
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 388ac0b5..cbe7d783 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -54,6 +54,7 @@ const (
 var (
        _ bus.MessageListener            = (*streamQueryProcessor)(nil)
        _ bus.MessageListener            = (*measureQueryProcessor)(nil)
+       _ bus.MessageListener            = (*measureInternalQueryProcessor)(nil)
        _ bus.MessageListener            = (*traceQueryProcessor)(nil)
        _ executor.TraceExecutionContext = trace.Trace(nil)
 )
@@ -161,6 +162,129 @@ type measureQueryProcessor struct {
        *bus.UnImplementedHealthyListener
 }
 
+// measureExecutionContext holds the common execution context for measure 
queries.
+type measureExecutionContext struct {
+       ml       *logger.Logger
+       ecc      []executor.MeasureExecutionContext
+       metadata []*commonv1.Metadata
+       schemas  []logical.Schema
+}
+
+// buildMeasureContext builds the execution context for a measure query.
+func buildMeasureContext(measureService measure.Service, log *logger.Logger, 
queryCriteria *measurev1.QueryRequest, logPrefix string) 
(*measureExecutionContext, error) {
+       var metadata []*commonv1.Metadata
+       var schemas []logical.Schema
+       var ecc []executor.MeasureExecutionContext
+       for i := range queryCriteria.Groups {
+               meta := &commonv1.Metadata{
+                       Name:  queryCriteria.Name,
+                       Group: queryCriteria.Groups[i],
+               }
+               ec, ecErr := measureService.Measure(meta)
+               if ecErr != nil {
+                       return nil, fmt.Errorf("fail to get execution context 
for measure %s: %w", meta.GetName(), ecErr)
+               }
+               s, schemaErr := logical_measure.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
+               if schemaErr != nil {
+                       return nil, fmt.Errorf("fail to build schema for 
measure %s: %w", meta.GetName(), schemaErr)
+               }
+               ecc = append(ecc, ec)
+               schemas = append(schemas, s)
+               metadata = append(metadata, meta)
+       }
+       ml := log.Named(logPrefix, queryCriteria.Groups[0], queryCriteria.Name)
+       return &measureExecutionContext{
+               metadata: metadata,
+               schemas:  schemas,
+               ecc:      ecc,
+               ml:       ml,
+       }, nil
+}
+
+// executeMeasurePlan executes the measure query plan and returns the iterator.
+func executeMeasurePlan(ctx context.Context, queryCriteria 
*measurev1.QueryRequest, mctx *measureExecutionContext) (executor.MIterator, 
logical.Plan, error) {
+       plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata, 
mctx.schemas, mctx.ecc)
+       if planErr != nil {
+               return nil, nil, fmt.Errorf("fail to analyze the query request 
for measure %s: %w", queryCriteria.GetName(), planErr)
+       }
+       if e := mctx.ml.Debug(); e.Enabled() {
+               e.Str("plan", plan.String()).Msg("query plan")
+       }
+       mIterator, execErr := plan.(executor.MeasureExecutable).Execute(ctx)
+       if execErr != nil {
+               mctx.ml.Error().Err(execErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to query")
+               return nil, nil, fmt.Errorf("fail to execute the query plan for 
measure %s: %w", queryCriteria.GetName(), execErr)
+       }
+       return mIterator, plan, nil
+}
+
+// extractTagValuesFromInternalDataPoints extracts tag values from 
InternalDataPoints for RewriteAggTopNResult.
+func extractTagValuesFromInternalDataPoints(dataPoints 
[]*measurev1.InternalDataPoint, groupByTags []string) 
map[string][]*modelv1.TagValue {
+       tagValueMap := make(map[string][]*modelv1.TagValue)
+       for _, idp := range dataPoints {
+               if idp.DataPoint != nil {
+                       extractTagValuesFromDataPoint(idp.DataPoint, 
groupByTags, tagValueMap)
+               }
+       }
+       return tagValueMap
+}
+
+// collectInternalDataPoints collects InternalDataPoints from the iterator.
+func collectInternalDataPoints(mIterator executor.MIterator) 
[]*measurev1.InternalDataPoint {
+       result := make([]*measurev1.InternalDataPoint, 0)
+       for mIterator.Next() {
+               current := mIterator.Current()
+               if len(current) > 0 {
+                       result = append(result, current[0])
+               }
+       }
+       return result
+}
+
+// extractTagValuesFromDataPoints extracts tag values from DataPoints for 
RewriteAggTopNResult.
+func extractTagValuesFromDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTags []string) map[string][]*modelv1.TagValue {
+       tagValueMap := make(map[string][]*modelv1.TagValue)
+       for _, dp := range dataPoints {
+               extractTagValuesFromDataPoint(dp, groupByTags, tagValueMap)
+       }
+       return tagValueMap
+}
+
+// extractTagValuesFromDataPoint extracts tag values from a single DataPoint 
and appends to tagValueMap.
+func extractTagValuesFromDataPoint(dp *measurev1.DataPoint, groupByTags 
[]string, tagValueMap map[string][]*modelv1.TagValue) {
+       for _, tagFamily := range dp.GetTagFamilies() {
+               for _, tag := range tagFamily.GetTags() {
+                       tagName := tag.GetKey()
+                       if len(groupByTags) == 0 || 
slices.Contains(groupByTags, tagName) {
+                               tagValueMap[tagName] = 
append(tagValueMap[tagName], tag.GetValue())
+                       }
+               }
+       }
+}
+
+// getGroupByTags extracts group by tag names from query criteria.
+func getGroupByTags(queryCriteria *measurev1.QueryRequest) []string {
+       groupByTags := make([]string, 0)
+       if queryCriteria.GetGroupBy() != nil {
+               for _, tagFamily := range 
queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() {
+                       groupByTags = append(groupByTags, 
tagFamily.GetTags()...)
+               }
+       }
+       return groupByTags
+}
+
+// buildRewriteQueryCriteria builds the rewrite query criteria for 
RewriteAggTopNResult.
+func buildRewriteQueryCriteria(queryCriteria *measurev1.QueryRequest, 
rewrittenCriteria *modelv1.Criteria) *measurev1.QueryRequest {
+       return &measurev1.QueryRequest{
+               Groups:          queryCriteria.Groups,
+               Name:            queryCriteria.Name,
+               TimeRange:       queryCriteria.TimeRange,
+               Criteria:        rewrittenCriteria,
+               TagProjection:   queryCriteria.TagProjection,
+               FieldProjection: queryCriteria.FieldProjection,
+       }
+}
+
 func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) 
(resp bus.Message) {
        queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
        n := time.Now()
@@ -182,36 +306,14 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
                if len(result) == 0 {
                        return
                }
-               groupByTags := make([]string, 0)
-               if queryCriteria.GetGroupBy() != nil {
-                       for _, tagFamily := range 
queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() {
-                               groupByTags = append(groupByTags, 
tagFamily.GetTags()...)
-                       }
-               }
-               tagValueMap := make(map[string][]*modelv1.TagValue)
-               for _, dp := range result {
-                       for _, tagFamily := range dp.GetTagFamilies() {
-                               for _, tag := range tagFamily.GetTags() {
-                                       tagName := tag.GetKey()
-                                       if len(groupByTags) == 0 || 
slices.Contains(groupByTags, tagName) {
-                                               tagValueMap[tagName] = 
append(tagValueMap[tagName], tag.GetValue())
-                                       }
-                               }
-                       }
-               }
-               rewriteCriteria, err := rewriteCriteria(tagValueMap)
-               if err != nil {
-                       p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
+               groupByTags := getGroupByTags(queryCriteria)
+               tagValueMap := extractTagValuesFromDataPoints(result, 
groupByTags)
+               rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
+               if rewriteErr != nil {
+                       p.log.Error().Err(rewriteErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
                        return
                }
-               rewriteQueryCriteria := &measurev1.QueryRequest{
-                       Groups:          queryCriteria.Groups,
-                       Name:            queryCriteria.Name,
-                       TimeRange:       queryCriteria.TimeRange,
-                       Criteria:        rewriteCriteria,
-                       TagProjection:   queryCriteria.TagProjection,
-                       FieldProjection: queryCriteria.FieldProjection,
-               }
+               rewriteQueryCriteria := 
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
                resp = p.executeQuery(ctx, rewriteQueryCriteria)
                dataPoints, handleErr := handleResponse(resp)
                if handleErr != nil {
@@ -226,43 +328,32 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
        n := time.Now()
        now := n.UnixNano()
        defer func() {
-               if err := recover(); err != nil {
-                       p.log.Error().Interface("err", err).RawJSON("req", 
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+               if recoverErr := recover(); recoverErr != nil {
+                       p.log.Error().Interface("err", 
recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", 
string(debug.Stack())).Msg("panic")
                        resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
                }
        }()
-       var metadata []*commonv1.Metadata
-       var schemas []logical.Schema
-       var ecc []executor.MeasureExecutionContext
-       for i := range queryCriteria.Groups {
-               meta := &commonv1.Metadata{
-                       Name:  queryCriteria.Name,
-                       Group: queryCriteria.Groups[i],
-               }
-               ec, err := p.measureService.Measure(meta)
-               if err != nil {
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to get execution context for measure %s: %v", 
meta.GetName(), err))
-                       return
-               }
-               s, err := logical_measure.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
-               if err != nil {
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to build schema for measure %s: %v", meta.GetName(), err))
-                       return
-               }
-               ecc = append(ecc, ec)
-               schemas = append(schemas, s)
-               metadata = append(metadata, meta)
+
+       mctx, buildErr := buildMeasureContext(p.measureService, p.log, 
queryCriteria, "measure")
+       if buildErr != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
buildErr))
+               return
        }
-       ml := p.log.Named("measure", queryCriteria.Groups[0], 
queryCriteria.Name)
-       if e := ml.Debug(); e.Enabled() {
+       if e := mctx.ml.Debug(); e.Enabled() {
                e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a 
query event")
        }
 
-       plan, err := logical_measure.Analyze(queryCriteria, metadata, schemas, 
ecc)
-       if err != nil {
-               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for measure %s: %v", queryCriteria.GetName(), err))
+       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+       if execErr != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
execErr))
                return
        }
+       defer func() {
+               if closeErr := mIterator.Close(); closeErr != nil {
+                       mctx.ml.Error().Err(closeErr).Dur("latency", 
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close 
the query plan")
+               }
+       }()
+
        var tracer *query.Tracer
        var span *query.Span
        if queryCriteria.Trace {
@@ -286,26 +377,7 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
                }()
        }
 
-       if e := ml.Debug(); e.Enabled() {
-               e.Str("plan", plan.String()).Msg("query plan")
-       }
-
-       mIterator, err := plan.(executor.MeasureExecutable).Execute(ctx)
-       if err != nil {
-               ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to query")
-               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", queryCriteria.GetName(), err))
-               return
-       }
-       defer func() {
-               if err = mIterator.Close(); err != nil {
-                       ml.Error().Err(err).Dur("latency", 
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close 
the query plan")
-                       if span != nil {
-                               span.Error(fmt.Errorf("fail to close the query 
plan: %w", err))
-                       }
-               }
-       }()
-
-       result := make([]*measurev1.DataPoint, 0)
+       var result []*measurev1.DataPoint
        func() {
                var r int
                if tracer != nil {
@@ -320,12 +392,13 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
                        r++
                        current := mIterator.Current()
                        if len(current) > 0 {
-                               result = append(result, current[0])
+                               result = append(result, 
current[0].GetDataPoint())
                        }
                }
        }()
+
        qr := &measurev1.QueryResponse{DataPoints: result}
-       if e := ml.Debug(); e.Enabled() {
+       if e := mctx.ml.Debug(); e.Enabled() {
                e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
        }
        resp = bus.NewMessage(bus.MessageID(now), qr)
@@ -350,6 +423,118 @@ func handleResponse(resp bus.Message) 
([]*measurev1.DataPoint, *common.Error) {
        }
 }
 
+type measureInternalQueryProcessor struct {
+       measureService measure.Service
+       *queryService
+       *bus.UnImplementedHealthyListener
+}
+
+func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message 
bus.Message) (resp bus.Message) {
+       internalRequest, ok := message.Data().(*measurev1.InternalQueryRequest)
+       n := time.Now()
+       now := n.UnixNano()
+       if !ok {
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
+               return
+       }
+       queryCriteria := internalRequest.GetRequest()
+       if queryCriteria == nil {
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("query request is nil"))
+               return
+       }
+       // Handle RewriteAggTopNResult: double the top number for initial query
+       if queryCriteria.RewriteAggTopNResult {
+               queryCriteria.Top.Number *= 2
+       }
+       defer func() {
+               if recoverErr := recover(); recoverErr != nil {
+                       p.log.Error().Interface("err", 
recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", 
string(debug.Stack())).Msg("panic")
+                       resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+               }
+       }()
+
+       mctx, buildErr := buildMeasureContext(p.measureService, p.log, 
queryCriteria, "internal-measure")
+       if buildErr != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
buildErr))
+               return
+       }
+       if e := mctx.ml.Debug(); e.Enabled() {
+               e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received an 
internal query event")
+       }
+
+       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+       if execErr != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
execErr))
+               return
+       }
+       defer func() {
+               if closeErr := mIterator.Close(); closeErr != nil {
+                       mctx.ml.Error().Err(closeErr).Dur("latency", 
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close 
the query plan")
+               }
+       }()
+
+       var tracer *query.Tracer
+       var span *query.Span
+       if queryCriteria.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "data-%s", 
p.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       respData := resp.Data()
+                       switch d := respData.(type) {
+                       case *measurev1.InternalQueryResponse:
+                               span.Tag("resp_count", fmt.Sprintf("%d", 
len(d.DataPoints)))
+                               d.Trace = tracer.ToProto()
+                               span.Stop()
+                       case *common.Error:
+                               span.Error(errors.New(d.Error()))
+                               span.Stop()
+                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.InternalQueryResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+               }()
+       }
+
+       result := collectInternalDataPoints(mIterator)
+
+       // Handle RewriteAggTopNResult: rewrite query to get original data with 
Timestamp
+       if queryCriteria.RewriteAggTopNResult && len(result) > 0 {
+               groupByTags := getGroupByTags(queryCriteria)
+               tagValueMap := extractTagValuesFromInternalDataPoints(result, 
groupByTags)
+               rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
+               if rewriteErr != nil {
+                       mctx.ml.Error().Err(rewriteErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
+               } else {
+                       rewriteQueryCriteria := 
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
+                       rewriteIterator, _, rewriteExecErr := 
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx)
+                       if rewriteExecErr != nil {
+                               
mctx.ml.Error().Err(rewriteExecErr).RawJSON("req", 
logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query 
plan")
+                       } else {
+                               defer func() {
+                                       if closeErr := rewriteIterator.Close(); 
closeErr != nil {
+                                               
mctx.ml.Error().Err(closeErr).Msg("fail to close the rewrite query plan")
+                                       }
+                               }()
+                               result = 
collectInternalDataPoints(rewriteIterator)
+                       }
+               }
+       }
+
+       qr := &measurev1.InternalQueryResponse{DataPoints: result}
+       if e := mctx.ml.Debug(); e.Enabled() {
+               e.RawJSON("ret", logger.Proto(qr)).Msg("got an internal measure 
response")
+       }
+       resp = bus.NewMessage(bus.MessageID(now), qr)
+       if !queryCriteria.Trace && p.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > p.slowQuery {
+                       p.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("internal 
measure slow query")
+               }
+       }
+       return
+}
+
 func rewriteCriteria(tagValueMap map[string][]*modelv1.TagValue) 
(*modelv1.Criteria, error) {
        var tagConditions []*modelv1.Condition
        for tagName, tagValues := range tagValueMap {
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 90a49bbc..33556434 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -176,7 +176,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        r++
                        current := mIterator.Current()
                        if len(current) > 0 {
-                               result = append(result, current[0])
+                               result = append(result, 
current[0].GetDataPoint())
                        }
                }
        }()
diff --git a/banyand/query/query.go b/banyand/query/query.go
index b5e1451a..4985f47e 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -42,6 +42,7 @@ type queryService struct {
        log         *logger.Logger
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
+       imqp        *measureInternalQueryProcessor
        nqp         *topNQueryProcessor
        tqp         *traceQueryProcessor
        nodeID      string
@@ -61,6 +62,11 @@ func NewService(_ context.Context, streamService 
stream.Service, measureService
                measureService: measureService,
                queryService:   svc,
        }
+       // internal measure query processor for distributed query
+       svc.imqp = &measureInternalQueryProcessor{
+               measureService: measureService,
+               queryService:   svc,
+       }
        // stream query processor
        svc.sqp = &streamQueryProcessor{
                streamService: streamService,
@@ -94,6 +100,7 @@ func (q *queryService) PreRun(ctx context.Context) error {
        return multierr.Combine(
                q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
                q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
+               q.pipeline.Subscribe(data.TopicInternalMeasureQuery, q.imqp),
                q.pipeline.Subscribe(data.TopicTopNQuery, q.nqp),
                q.pipeline.Subscribe(data.TopicTraceQuery, q.tqp),
        )
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 21081768..814da7f7 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -40,6 +40,9 @@
 - [banyandb/measure/v1/query.proto](#banyandb_measure_v1_query-proto)
     - [DataPoint](#banyandb-measure-v1-DataPoint)
     - [DataPoint.Field](#banyandb-measure-v1-DataPoint-Field)
+    - [InternalDataPoint](#banyandb-measure-v1-InternalDataPoint)
+    - [InternalQueryRequest](#banyandb-measure-v1-InternalQueryRequest)
+    - [InternalQueryResponse](#banyandb-measure-v1-InternalQueryResponse)
     - [QueryRequest](#banyandb-measure-v1-QueryRequest)
     - [QueryRequest.Aggregation](#banyandb-measure-v1-QueryRequest-Aggregation)
     - 
[QueryRequest.FieldProjection](#banyandb-measure-v1-QueryRequest-FieldProjection)
@@ -907,6 +910,56 @@ DataPoint is stored in Measures
 
 
 
+<a name="banyandb-measure-v1-InternalDataPoint"></a>
+
+### InternalDataPoint
+InternalDataPoint wraps DataPoint with shard information for internal use.
+Used in distributed query to distinguish data from different shards.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| data_point | [DataPoint](#banyandb-measure-v1-DataPoint) |  | The actual 
data point |
+| shard_id | [uint32](#uint32) |  | The shard id where this data point comes 
from |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-InternalQueryRequest"></a>
+
+### InternalQueryRequest
+InternalQueryRequest is the internal request for distributed query.
+Wraps QueryRequest for extensibility.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| request | [QueryRequest](#banyandb-measure-v1-QueryRequest) |  | The actual 
query request |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-InternalQueryResponse"></a>
+
+### InternalQueryResponse
+InternalQueryResponse is the internal response for distributed query.
+Contains shard information for proper deduplication.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| data_points | [InternalDataPoint](#banyandb-measure-v1-InternalDataPoint) | 
repeated | data_points with shard information |
+| trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) |  | trace 
contains the trace information of the query when trace is enabled |
+
+
+
+
+
+
 <a name="banyandb-measure-v1-QueryRequest"></a>
 
 ### QueryRequest
@@ -4962,6 +5015,7 @@ WriteResponse is the response contract for write
 | Method Name | Request Type | Response Type | Description |
 | ----------- | ------------ | ------------- | ------------|
 | Query | [QueryRequest](#banyandb-measure-v1-QueryRequest) | 
[QueryResponse](#banyandb-measure-v1-QueryResponse) |  |
+| InternalQuery | 
[InternalQueryRequest](#banyandb-measure-v1-InternalQueryRequest) | 
[InternalQueryResponse](#banyandb-measure-v1-InternalQueryResponse) | 
InternalQuery is used for internal distributed query between liaison and data 
nodes. Returns InternalQueryResponse with shard information for proper 
deduplication. |
 | Write | [WriteRequest](#banyandb-measure-v1-WriteRequest) stream | 
[WriteResponse](#banyandb-measure-v1-WriteResponse) stream |  |
 | TopN | [TopNRequest](#banyandb-measure-v1-TopNRequest) | 
[TopNResponse](#banyandb-measure-v1-TopNResponse) |  |
 | DeleteExpiredSegments | 
[DeleteExpiredSegmentsRequest](#banyandb-measure-v1-DeleteExpiredSegmentsRequest)
 | 
[DeleteExpiredSegmentsResponse](#banyandb-measure-v1-DeleteExpiredSegmentsResponse)
 |  |
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 9953f538..5aef8b14 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -49,7 +49,7 @@ type MeasureExecutionContext interface {
 type MIterator interface {
        Next() bool
 
-       Current() []*measurev1.DataPoint
+       Current() []*measurev1.InternalDataPoint
 
        Close() error
 }
diff --git a/pkg/query/logical/measure/measure_plan.go 
b/pkg/query/logical/measure/measure_plan.go
index d9bc6a68..b6e6a0a3 100644
--- a/pkg/query/logical/measure/measure_plan.go
+++ b/pkg/query/logical/measure/measure_plan.go
@@ -98,7 +98,7 @@ func (l *limitIterator) Close() error {
        return l.inner.Close()
 }
 
-func (l *limitIterator) Current() []*measurev1.DataPoint {
+func (l *limitIterator) Current() []*measurev1.InternalDataPoint {
        return l.inner.Current()
 }
 
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go 
b/pkg/query/logical/measure/measure_plan_aggregation.go
index e304119e..ec2b06a6 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -160,14 +160,16 @@ func (ami *aggGroupIterator[N]) Next() bool {
        return ami.prev.Next()
 }
 
-func (ami *aggGroupIterator[N]) Current() []*measurev1.DataPoint {
+func (ami *aggGroupIterator[N]) Current() []*measurev1.InternalDataPoint {
        if ami.err != nil {
                return nil
        }
        ami.aggrFunc.Reset()
        group := ami.prev.Current()
        var resultDp *measurev1.DataPoint
-       for _, dp := range group {
+       var shardID uint32
+       for _, idp := range group {
+               dp := idp.GetDataPoint()
                value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
                        GetValue()
                v, err := aggregation.FromFieldValue[N](value)
@@ -179,6 +181,7 @@ func (ami *aggGroupIterator[N]) Current() 
[]*measurev1.DataPoint {
                if resultDp != nil {
                        continue
                }
+               shardID = idp.GetShardId()
                resultDp = &measurev1.DataPoint{
                        TagFamilies: dp.TagFamilies,
                }
@@ -197,7 +200,7 @@ func (ami *aggGroupIterator[N]) Current() 
[]*measurev1.DataPoint {
                        Value: val,
                },
        }
-       return []*measurev1.DataPoint{resultDp}
+       return []*measurev1.InternalDataPoint{{DataPoint: resultDp, ShardId: 
shardID}}
 }
 
 func (ami *aggGroupIterator[N]) Close() error {
@@ -232,7 +235,8 @@ func (ami *aggAllIterator[N]) Next() bool {
        var resultDp *measurev1.DataPoint
        for ami.prev.Next() {
                group := ami.prev.Current()
-               for _, dp := range group {
+               for _, idp := range group {
+                       dp := idp.GetDataPoint()
                        value := 
dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
                                GetValue()
                        v, err := aggregation.FromFieldValue[N](value)
@@ -267,11 +271,12 @@ func (ami *aggAllIterator[N]) Next() bool {
        return true
 }
 
-func (ami *aggAllIterator[N]) Current() []*measurev1.DataPoint {
+func (ami *aggAllIterator[N]) Current() []*measurev1.InternalDataPoint {
        if ami.result == nil {
                return nil
        }
-       return []*measurev1.DataPoint{ami.result}
+       // For aggregation across all data, shard ID is not applicable
+       return []*measurev1.InternalDataPoint{{DataPoint: ami.result, ShardId: 
0}}
 }
 
 func (ami *aggAllIterator[N]) Close() error {
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index e401b970..d81d3f68 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -27,6 +27,7 @@ import (
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
@@ -271,37 +272,38 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(mi executor.MIterator, e
                        }
                }()
        }
-       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
-               
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos), 
dctx.NodeSelectors(), dctx.TimeRange(), queryRequest))
-       if err != nil {
-               return nil, err
+       internalRequest := &measurev1.InternalQueryRequest{Request: 
queryRequest}
+       ff, broadcastErr := dctx.Broadcast(defaultQueryTimeout, 
data.TopicInternalMeasureQuery,
+               
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos), 
dctx.NodeSelectors(), dctx.TimeRange(), internalRequest))
+       if broadcastErr != nil {
+               return nil, broadcastErr
        }
        var see []sort.Iterator[*comparableDataPoint]
-       var pushedDownAggDps []*measurev1.DataPoint
+       var pushedDownAggDps []*measurev1.InternalDataPoint
        var responseCount int
        var dataPointCount int
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        err = multierr.Append(err, getErr)
                } else {
-                       d := m.Data()
-                       if d == nil {
-                               continue
-                       }
-                       resp := d.(*measurev1.QueryResponse)
-                       responseCount++
-                       if span != nil {
-                               span.AddSubTrace(resp.Trace)
-                       }
-                       if t.needCompletePushDownAgg {
-                               pushedDownAggDps = append(pushedDownAggDps, 
resp.DataPoints...)
-                               dataPointCount += len(resp.DataPoints)
-                               continue
+                       switch d := m.Data().(type) {
+                       case *measurev1.InternalQueryResponse:
+                               responseCount++
+                               if span != nil {
+                                       span.AddSubTrace(d.Trace)
+                               }
+                               if t.needCompletePushDownAgg {
+                                       pushedDownAggDps = 
append(pushedDownAggDps, d.DataPoints...)
+                                       dataPointCount += len(d.DataPoints)
+                                       continue
+                               }
+                               dataPointCount += len(d.DataPoints)
+                               see = append(see,
+                                       newSortableElements(d.DataPoints,
+                                               t.sortByTime, t.sortTagSpec))
+                       case *common.Error:
+                               err = multierr.Append(err, fmt.Errorf("data 
node error: %s", d.Error()))
                        }
-                       dataPointCount += len(resp.DataPoints)
-                       see = append(see,
-                               newSortableElements(resp.DataPoints,
-                                       t.sortByTime, t.sortTagSpec))
                }
        }
        if span != nil {
@@ -309,7 +311,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                span.Tagf("data_point_count", "%d", dataPointCount)
        }
        if t.needCompletePushDownAgg {
-               deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPoints(pushedDownAggDps, t.groupByTagsRefs)
+               deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
                if dedupErr != nil {
                        return nil, multierr.Append(err, dedupErr)
                }
@@ -347,25 +349,26 @@ func (t *distributedPlan) Limit(maxVal int) {
 var _ sort.Comparable = (*comparableDataPoint)(nil)
 
 type comparableDataPoint struct {
-       *measurev1.DataPoint
+       *measurev1.InternalDataPoint
        sortField []byte
 }
 
-func newComparableElement(e *measurev1.DataPoint, sortByTime bool, sortTagSpec 
logical.TagSpec) (*comparableDataPoint, error) {
+func newComparableElement(idp *measurev1.InternalDataPoint, sortByTime bool, 
sortTagSpec logical.TagSpec) (*comparableDataPoint, error) {
+       dp := idp.GetDataPoint()
        var sortField []byte
        if sortByTime {
-               sortField = 
convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano()))
+               sortField = 
convert.Uint64ToBytes(uint64(dp.Timestamp.AsTime().UnixNano()))
        } else {
                var err error
-               sortField, err = 
pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value)
+               sortField, err = 
pbv1.MarshalTagValue(dp.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value)
                if err != nil {
                        return nil, err
                }
        }
 
        return &comparableDataPoint{
-               DataPoint: e,
-               sortField: sortField,
+               InternalDataPoint: idp,
+               sortField:         sortField,
        }, nil
 }
 
@@ -377,13 +380,13 @@ var _ sort.Iterator[*comparableDataPoint] = 
(*sortableElements)(nil)
 
 type sortableElements struct {
        cur          *comparableDataPoint
-       dataPoints   []*measurev1.DataPoint
+       dataPoints   []*measurev1.InternalDataPoint
        sortTagSpec  logical.TagSpec
        index        int
        isSortByTime bool
 }
 
-func newSortableElements(dataPoints []*measurev1.DataPoint, isSortByTime bool, 
sortTagSpec logical.TagSpec) *sortableElements {
+func newSortableElements(dataPoints []*measurev1.InternalDataPoint, 
isSortByTime bool, sortTagSpec logical.TagSpec) *sortableElements {
        return &sortableElements{
                dataPoints:   dataPoints,
                isSortByTime: isSortByTime,
@@ -396,8 +399,8 @@ func (*sortableElements) Close() error {
 }
 
 func (s *sortableElements) Next() bool {
-       return s.iter(func(e *measurev1.DataPoint) (*comparableDataPoint, 
error) {
-               return newComparableElement(e, s.isSortByTime, s.sortTagSpec)
+       return s.iter(func(idp *measurev1.InternalDataPoint) 
(*comparableDataPoint, error) {
+               return newComparableElement(idp, s.isSortByTime, s.sortTagSpec)
        })
 }
 
@@ -405,7 +408,7 @@ func (s *sortableElements) Val() *comparableDataPoint {
        return s.cur
 }
 
-func (s *sortableElements) iter(fn func(*measurev1.DataPoint) 
(*comparableDataPoint, error)) bool {
+func (s *sortableElements) iter(fn func(*measurev1.InternalDataPoint) 
(*comparableDataPoint, error)) bool {
        if s.index >= len(s.dataPoints) {
                return false
        }
@@ -423,8 +426,8 @@ var _ executor.MIterator = (*sortedMIterator)(nil)
 type sortedMIterator struct {
        sort.Iterator[*comparableDataPoint]
        data        *list.List
-       uniqueData  map[uint64]*measurev1.DataPoint
-       cur         *measurev1.DataPoint
+       uniqueData  map[uint64]*measurev1.InternalDataPoint
+       cur         *measurev1.InternalDataPoint
        initialized bool
        closed      bool
 }
@@ -439,7 +442,7 @@ func (s *sortedMIterator) init() {
                return
        }
        s.data = list.New()
-       s.uniqueData = make(map[uint64]*measurev1.DataPoint)
+       s.uniqueData = make(map[uint64]*measurev1.InternalDataPoint)
        s.loadDps()
 }
 
@@ -453,9 +456,9 @@ func (s *sortedMIterator) Next() bool {
                        return false
                }
        }
-       dp := s.data.Front()
-       s.data.Remove(dp)
-       s.cur = dp.Value.(*measurev1.DataPoint)
+       idp := s.data.Front()
+       s.data.Remove(idp)
+       s.cur = idp.Value.(*measurev1.InternalDataPoint)
        return true
 }
 
@@ -467,7 +470,7 @@ func (s *sortedMIterator) loadDps() {
                delete(s.uniqueData, k)
        }
        first := s.Iterator.Val()
-       s.uniqueData[hashDataPoint(first.DataPoint)] = first.DataPoint
+       s.uniqueData[hashDataPoint(first.GetDataPoint())] = 
first.InternalDataPoint
        for {
                if !s.Iterator.Next() {
                        s.closed = true
@@ -475,13 +478,13 @@ func (s *sortedMIterator) loadDps() {
                }
                v := s.Iterator.Val()
                if bytes.Equal(first.SortedField(), v.SortedField()) {
-                       key := hashDataPoint(v.DataPoint)
+                       key := hashDataPoint(v.GetDataPoint())
                        if existed, ok := s.uniqueData[key]; ok {
-                               if v.DataPoint.Version > existed.Version {
-                                       s.uniqueData[key] = v.DataPoint
+                               if v.GetDataPoint().Version > 
existed.GetDataPoint().Version {
+                                       s.uniqueData[key] = v.InternalDataPoint
                                }
                        } else {
-                               s.uniqueData[key] = v.DataPoint
+                               s.uniqueData[key] = v.InternalDataPoint
                        }
                } else {
                        break
@@ -492,8 +495,12 @@ func (s *sortedMIterator) loadDps() {
        }
 }
 
-func (s *sortedMIterator) Current() []*measurev1.DataPoint {
-       return []*measurev1.DataPoint{s.cur}
+func (s *sortedMIterator) Current() []*measurev1.InternalDataPoint {
+       return []*measurev1.InternalDataPoint{s.cur}
+}
+
+func (s *sortedMIterator) Close() error {
+       return nil
 }
 
 const (
@@ -512,7 +519,7 @@ func hashDataPoint(dp *measurev1.DataPoint) uint64 {
 }
 
 type pushedDownAggregatedIterator struct {
-       dataPoints []*measurev1.DataPoint
+       dataPoints []*measurev1.InternalDataPoint
        index      int
 }
 
@@ -524,33 +531,47 @@ func (s *pushedDownAggregatedIterator) Next() bool {
        return true
 }
 
-func (s *pushedDownAggregatedIterator) Current() []*measurev1.DataPoint {
+func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.InternalDataPoint {
        if s.index == 0 || s.index > len(s.dataPoints) {
                return nil
        }
-       return []*measurev1.DataPoint{s.dataPoints[s.index-1]}
+       return []*measurev1.InternalDataPoint{s.dataPoints[s.index-1]}
 }
 
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
 
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas.
-func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
+// deduplicateAggregatedDataPointsWithShard removes duplicate aggregated 
results from multiple replicas
+// of the same shard, while preserving results from different shards.
+func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDataPoint, groupByTagsRefs [][]*logical.TagRef) 
([]*measurev1.InternalDataPoint, error) {
        if len(groupByTagsRefs) == 0 {
                return dataPoints, nil
        }
+       // key = hash(shard_id, group_key)
+       // Same shard with same group key will be deduplicated
+       // Different shards with same group key will be preserved
        groupMap := make(map[uint64]struct{})
-       result := make([]*measurev1.DataPoint, 0, len(dataPoints))
-       for _, dp := range dataPoints {
-               key, err := formatGroupByKey(dp, groupByTagsRefs)
-               if err != nil {
-                       return nil, err
+       result := make([]*measurev1.InternalDataPoint, 0, len(dataPoints))
+       for _, idp := range dataPoints {
+               groupKey, keyErr := formatGroupByKey(idp.DataPoint, 
groupByTagsRefs)
+               if keyErr != nil {
+                       return nil, keyErr
                }
+               // Include shard_id in key calculation
+               key := hashWithShard(uint64(idp.ShardId), groupKey)
                if _, exists := groupMap[key]; !exists {
                        groupMap[key] = struct{}{}
-                       result = append(result, dp)
+                       result = append(result, idp)
                }
        }
        return result, nil
 }
+
+// hashWithShard combines shard_id and group_key into a single hash.
+func hashWithShard(shardID, groupKey uint64) uint64 {
+       h := uint64(offset64)
+       h = (h ^ shardID) * prime64
+       h = (h ^ groupKey) * prime64
+       return h
+}
diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go 
b/pkg/query/logical/measure/measure_plan_distributed_test.go
index e8e1dfb2..7468b4fd 100644
--- a/pkg/query/logical/measure/measure_plan_distributed_test.go
+++ b/pkg/query/logical/measure/measure_plan_distributed_test.go
@@ -25,7 +25,10 @@ import (
        "google.golang.org/protobuf/testing/protocmp"
        timestamppb "google.golang.org/protobuf/types/known/timestamppb"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 type mockIterator struct {
@@ -46,6 +49,19 @@ func (m *mockIterator) Close() error {
        return nil
 }
 
+func makeComparableDP(sid uint64, seconds, nanos int64, version int64, 
sortField byte) *comparableDataPoint {
+       return &comparableDataPoint{
+               InternalDataPoint: &measurev1.InternalDataPoint{
+                       DataPoint: &measurev1.DataPoint{
+                               Sid:       sid,
+                               Timestamp: &timestamppb.Timestamp{Seconds: 
seconds, Nanos: int32(nanos)},
+                               Version:   version,
+                       },
+               },
+               sortField: []byte{sortField},
+       }
+}
+
 func TestSortedMIterator(t *testing.T) {
        testCases := []struct {
                name string
@@ -60,8 +76,8 @@ func TestSortedMIterator(t *testing.T) {
                {
                        name: "all data points are the same",
                        data: []*comparableDataPoint{
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
+                               makeComparableDP(1, 1, 1, 1, 1),
+                               makeComparableDP(1, 1, 1, 1, 1),
                        },
                        want: []*measurev1.DataPoint{
                                {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
@@ -70,8 +86,8 @@ func TestSortedMIterator(t *testing.T) {
                {
                        name: "identical data points with different sort 
fields",
                        data: []*comparableDataPoint{
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{2}},
+                               makeComparableDP(1, 1, 1, 1, 1),
+                               makeComparableDP(1, 1, 1, 1, 2),
                        },
                        want: []*measurev1.DataPoint{
                                {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
@@ -81,8 +97,8 @@ func TestSortedMIterator(t *testing.T) {
                {
                        name: "different data points with different sort 
fields",
                        data: []*comparableDataPoint{
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, 
sortField: []byte{2}},
+                               makeComparableDP(1, 1, 1, 1, 1),
+                               makeComparableDP(1, 2, 2, 1, 2),
                        },
                        want: []*measurev1.DataPoint{
                                {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
@@ -92,9 +108,9 @@ func TestSortedMIterator(t *testing.T) {
                {
                        name: "identical data points with different versions",
                        data: []*comparableDataPoint{
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, 
sortField: []byte{1}},
+                               makeComparableDP(1, 1, 1, 1, 1),
+                               makeComparableDP(1, 2, 2, 1, 1),
+                               makeComparableDP(1, 1, 1, 2, 1),
                        },
                        want: []*measurev1.DataPoint{
                                {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
@@ -102,17 +118,17 @@ func TestSortedMIterator(t *testing.T) {
                        },
                },
                {
-                       name: "identical data points with different versions",
+                       name: "multiple sids with different versions",
                        data: []*comparableDataPoint{
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 2, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2}, 
sortField: []byte{1}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, 
sortField: []byte{2}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1}, 
sortField: []byte{2}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 2, 
Timestamp: &timestamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2}, 
sortField: []byte{2}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 1}, 
sortField: []byte{3}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 1, 
Timestamp: &timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, 
sortField: []byte{3}},
-                               {DataPoint: &measurev1.DataPoint{Sid: 3, 
Timestamp: &timestamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2}, 
sortField: []byte{3}},
+                               makeComparableDP(1, 1, 1, 1, 1),
+                               makeComparableDP(2, 1, 1, 1, 1),
+                               makeComparableDP(1, 1, 1, 2, 1),
+                               makeComparableDP(1, 2, 2, 2, 2),
+                               makeComparableDP(1, 2, 2, 1, 2),
+                               makeComparableDP(2, 2, 2, 2, 2),
+                               makeComparableDP(1, 3, 3, 1, 3),
+                               makeComparableDP(1, 3, 3, 2, 3),
+                               makeComparableDP(3, 3, 3, 2, 3),
                        },
                        want: []*measurev1.DataPoint{
                                {Sid: 1, Timestamp: 
&timestamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
@@ -135,7 +151,7 @@ func TestSortedMIterator(t *testing.T) {
 
                        got := make([]*measurev1.DataPoint, 0)
                        for iter.Next() {
-                               got = append(got, iter.Current()[0])
+                               got = append(got, 
iter.Current()[0].GetDataPoint())
                        }
 
                        slices.SortFunc(got, func(a, b *measurev1.DataPoint) 
int {
@@ -158,3 +174,131 @@ func TestSortedMIterator(t *testing.T) {
                })
        }
 }
+
+func makeInternalDP(shardID uint32, tagValue string) 
*measurev1.InternalDataPoint {
+       return &measurev1.InternalDataPoint{
+               ShardId: shardID,
+               DataPoint: &measurev1.DataPoint{
+                       TagFamilies: []*modelv1.TagFamily{
+                               {
+                                       Name: "default",
+                                       Tags: []*modelv1.Tag{
+                                               {Key: "group_tag", Value: 
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
tagValue}}}},
+                                       },
+                               },
+                       },
+               },
+       }
+}
+
+func makeGroupByTagsRefs() [][]*logical.TagRef {
+       return [][]*logical.TagRef{
+               {
+                       {
+                               Tag:  &logical.Tag{},
+                               Spec: &logical.TagSpec{TagFamilyIdx: 0, TagIdx: 
0, Spec: &databasev1.TagSpec{Type: databasev1.TagType_TAG_TYPE_STRING}},
+                       },
+               },
+       }
+}
+
+func TestDeduplicateAggregatedDataPointsWithShard(t *testing.T) {
+       testCases := []struct {
+               name            string
+               dataPoints      []*measurev1.InternalDataPoint
+               groupByTagsRefs [][]*logical.TagRef
+               wantShardIDs    []uint32
+               wantTagValues   []string
+               wantLen         int
+               expectErr       bool
+       }{
+               {
+                       name:            "empty groupByTagsRefs returns all 
data points",
+                       dataPoints:      
[]*measurev1.InternalDataPoint{makeInternalDP(1, "a"), makeInternalDP(2, "a")},
+                       groupByTagsRefs: nil,
+                       wantLen:         2,
+                       wantShardIDs:    []uint32{1, 2},
+                       wantTagValues:   []string{"a", "a"},
+               },
+               {
+                       name:            "deduplicate replicas of same shard 
with same group key",
+                       dataPoints:      
[]*measurev1.InternalDataPoint{makeInternalDP(1, "a"), makeInternalDP(1, "a")},
+                       groupByTagsRefs: makeGroupByTagsRefs(),
+                       wantLen:         1,
+                       wantShardIDs:    []uint32{1},
+                       wantTagValues:   []string{"a"},
+               },
+               {
+                       name:            "preserve data from different shards 
with same group key",
+                       dataPoints:      
[]*measurev1.InternalDataPoint{makeInternalDP(1, "a"), makeInternalDP(2, "a")},
+                       groupByTagsRefs: makeGroupByTagsRefs(),
+                       wantLen:         2,
+                       wantShardIDs:    []uint32{1, 2},
+                       wantTagValues:   []string{"a", "a"},
+               },
+               {
+                       name: "preserve data from same shard with different 
group keys",
+                       dataPoints: []*measurev1.InternalDataPoint{
+                               makeInternalDP(1, "a"),
+                               makeInternalDP(1, "b"),
+                       },
+                       groupByTagsRefs: makeGroupByTagsRefs(),
+                       wantLen:         2,
+                       wantShardIDs:    []uint32{1, 1},
+                       wantTagValues:   []string{"a", "b"},
+               },
+               {
+                       name: "complex scenario: multiple shards and group keys 
with replicas",
+                       dataPoints: []*measurev1.InternalDataPoint{
+                               makeInternalDP(1, "a"),
+                               makeInternalDP(1, "a"), // replica, should be 
deduplicated
+                               makeInternalDP(2, "a"), // different shard, keep
+                               makeInternalDP(1, "b"), // different group key, 
keep
+                               makeInternalDP(2, "b"), // different shard, keep
+                               makeInternalDP(2, "b"), // replica, should be 
deduplicated
+                       },
+                       groupByTagsRefs: makeGroupByTagsRefs(),
+                       wantLen:         4,
+                       wantShardIDs:    []uint32{1, 2, 1, 2},
+                       wantTagValues:   []string{"a", "a", "b", "b"},
+               },
+               {
+                       name:            "empty data points",
+                       dataPoints:      []*measurev1.InternalDataPoint{},
+                       groupByTagsRefs: makeGroupByTagsRefs(),
+                       wantLen:         0,
+                       wantShardIDs:    []uint32{},
+                       wantTagValues:   []string{},
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       got, err := 
deduplicateAggregatedDataPointsWithShard(tc.dataPoints, tc.groupByTagsRefs)
+                       if tc.expectErr {
+                               if err == nil {
+                                       t.Errorf("expected error but got nil")
+                               }
+                               return
+                       }
+                       if err != nil {
+                               t.Errorf("unexpected error: %v", err)
+                               return
+                       }
+                       if len(got) != tc.wantLen {
+                               t.Errorf("got %d data points, want %d", 
len(got), tc.wantLen)
+                               return
+                       }
+
+                       for i, dp := range got {
+                               if dp.ShardId != tc.wantShardIDs[i] {
+                                       t.Errorf("data point %d: got shard %d, 
want %d", i, dp.ShardId, tc.wantShardIDs[i])
+                               }
+                               tagValue := 
dp.DataPoint.GetTagFamilies()[0].GetTags()[0].GetValue().GetStr().GetValue()
+                               if tagValue != tc.wantTagValues[i] {
+                                       t.Errorf("data point %d: got tag value 
%s, want %s", i, tagValue, tc.wantTagValues[i])
+                               }
+                       }
+               })
+       }
+}
diff --git a/pkg/query/logical/measure/measure_plan_groupby.go 
b/pkg/query/logical/measure/measure_plan_groupby.go
index 6584fd71..6a54aeee 100644
--- a/pkg/query/logical/measure/measure_plan_groupby.go
+++ b/pkg/query/logical/measure/measure_plan_groupby.go
@@ -129,24 +129,24 @@ func (g *groupBy) hash(ec context.Context) (mit 
executor.MIterator, err error) {
                err = multierr.Append(err, iter.Close())
        }()
 
-       groupMap := make(map[uint64][]*measurev1.DataPoint)
+       groupMap := make(map[uint64][]*measurev1.InternalDataPoint)
        groupLst := make([]uint64, 0)
        for iter.Next() {
                dataPoints := iter.Current()
-               for _, dp := range dataPoints {
-                       key, innerErr := formatGroupByKey(dp, g.groupByTagsRefs)
+               for _, idp := range dataPoints {
+                       key, innerErr := formatGroupByKey(idp.GetDataPoint(), 
g.groupByTagsRefs)
                        if innerErr != nil {
                                return nil, innerErr
                        }
                        group, ok := groupMap[key]
                        if !ok {
-                               group = make([]*measurev1.DataPoint, 0)
+                               group = make([]*measurev1.InternalDataPoint, 0)
                                groupLst = append(groupLst, key)
                        }
                        if group == nil {
                                return nil, errors.New("aggregation op does not 
exist")
                        }
-                       group = append(group, dp)
+                       group = append(group, idp)
                        groupMap[key] = group
                }
        }
@@ -184,12 +184,12 @@ func formatGroupByKey(point *measurev1.DataPoint, 
groupByTagsRefs [][]*logical.T
 }
 
 type groupIterator struct {
-       groupMap map[uint64][]*measurev1.DataPoint
+       groupMap map[uint64][]*measurev1.InternalDataPoint
        groupLst []uint64
        index    int
 }
 
-func newGroupIterator(groupedMap map[uint64][]*measurev1.DataPoint, groupLst 
[]uint64) executor.MIterator {
+func newGroupIterator(groupedMap map[uint64][]*measurev1.InternalDataPoint, 
groupLst []uint64) executor.MIterator {
        return &groupIterator{
                groupMap: groupedMap,
                groupLst: groupLst,
@@ -205,7 +205,7 @@ func (gmi *groupIterator) Next() bool {
        return true
 }
 
-func (gmi *groupIterator) Current() []*measurev1.DataPoint {
+func (gmi *groupIterator) Current() []*measurev1.InternalDataPoint {
        key := gmi.groupLst[gmi.index]
        return gmi.groupMap[key]
 }
@@ -218,9 +218,9 @@ func (gmi *groupIterator) Close() error {
 type groupSortIterator struct {
        iter            executor.MIterator
        err             error
-       cdp             *measurev1.DataPoint
+       cdp             *measurev1.InternalDataPoint
        groupByTagsRefs [][]*logical.TagRef
-       current         []*measurev1.DataPoint
+       current         []*measurev1.InternalDataPoint
        index           int
        key             uint64
        closed          bool
@@ -245,12 +245,12 @@ func (gmi *groupSortIterator) Next() bool {
                gmi.current = append(gmi.current, gmi.cdp)
        }
        for {
-               dp, ok := gmi.nextDP()
+               idp, ok := gmi.nextDP()
                if !ok {
                        gmi.closed = true
                        return len(gmi.current) > 0
                }
-               k, err := formatGroupByKey(dp, gmi.groupByTagsRefs)
+               k, err := formatGroupByKey(idp.GetDataPoint(), 
gmi.groupByTagsRefs)
                if err != nil {
                        gmi.closed = true
                        gmi.err = err
@@ -260,15 +260,15 @@ func (gmi *groupSortIterator) Next() bool {
                        gmi.key = k
                }
                if gmi.key != k {
-                       gmi.cdp = dp
+                       gmi.cdp = idp
                        gmi.key = k
                        return true
                }
-               gmi.current = append(gmi.current, dp)
+               gmi.current = append(gmi.current, idp)
        }
 }
 
-func (gmi *groupSortIterator) Current() []*measurev1.DataPoint {
+func (gmi *groupSortIterator) Current() []*measurev1.InternalDataPoint {
        return gmi.current
 }
 
@@ -277,7 +277,7 @@ func (gmi *groupSortIterator) Close() error {
        return multierr.Combine(gmi.err, gmi.iter.Close())
 }
 
-func (gmi *groupSortIterator) nextDP() (*measurev1.DataPoint, bool) {
+func (gmi *groupSortIterator) nextDP() (*measurev1.InternalDataPoint, bool) {
        if gmi.index < 0 {
                if ok := gmi.iter.Next(); !ok {
                        return nil, false
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 9f4a2270..f82dee2f 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -24,6 +24,7 @@ import (
 
        "google.golang.org/protobuf/types/known/timestamppb"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -279,7 +280,7 @@ type resultMIterator struct {
        result           model.MeasureQueryResult
        hiddenTags       logical.HiddenTagSet
        err              error
-       current          []*measurev1.DataPoint
+       current          []*measurev1.InternalDataPoint
        projectionTags   []model.TagProjection
        projectionFields []string
        i                int
@@ -364,14 +365,21 @@ func (ei *resultMIterator) Next() bool {
                                })
                        }
                }
-               ei.current = append(ei.current, dp)
+               var shardID common.ShardID
+               if len(r.ShardIDs) > i {
+                       shardID = r.ShardIDs[i]
+               }
+               ei.current = append(ei.current, &measurev1.InternalDataPoint{
+                       DataPoint: dp,
+                       ShardId:   uint32(shardID),
+               })
        }
 
        return true
 }
 
-func (ei *resultMIterator) Current() []*measurev1.DataPoint {
-       return []*measurev1.DataPoint{ei.current[ei.i]}
+func (ei *resultMIterator) Current() []*measurev1.InternalDataPoint {
+       return []*measurev1.InternalDataPoint{ei.current[ei.i]}
 }
 
 func (ei *resultMIterator) Close() error {
diff --git a/pkg/query/logical/measure/measure_plan_top.go 
b/pkg/query/logical/measure/measure_plan_top.go
index 25fc5062..559b635e 100644
--- a/pkg/query/logical/measure/measure_plan_top.go
+++ b/pkg/query/logical/measure/measure_plan_top.go
@@ -102,12 +102,12 @@ func (g *topOp) Execute(ec context.Context) (mit 
executor.MIterator, err error)
        g.topNStream.Purge()
        for iter.Next() {
                dpp := iter.Current()
-               for _, dp := range dpp {
-                       value := dp.GetFields()[g.fieldRef.Spec.FieldIdx].
+               for _, idp := range dpp {
+                       value := 
idp.GetDataPoint().GetFields()[g.fieldRef.Spec.FieldIdx].
                                GetValue().
                                GetInt().
                                GetValue()
-                       g.topNStream.Insert(NewTopElement(dp, value))
+                       g.topNStream.Insert(NewTopElement(idp, value))
                }
        }
        return newTopIterator(g.topNStream.Elements()), nil
@@ -130,8 +130,8 @@ func (ami *topIterator) Next() bool {
        return ami.index < len(ami.elements)
 }
 
-func (ami *topIterator) Current() []*measurev1.DataPoint {
-       return []*measurev1.DataPoint{ami.elements[ami.index].dp}
+func (ami *topIterator) Current() []*measurev1.InternalDataPoint {
+       return []*measurev1.InternalDataPoint{ami.elements[ami.index].idp}
 }
 
 func (ami *topIterator) Close() error {
diff --git a/pkg/query/logical/measure/measure_top.go 
b/pkg/query/logical/measure/measure_top.go
index 39bec105..f7614e62 100644
--- a/pkg/query/logical/measure/measure_top.go
+++ b/pkg/query/logical/measure/measure_top.go
@@ -27,14 +27,14 @@ import (
 
 // TopElement seals a sortable value and its data point which this value 
belongs to.
 type TopElement struct {
-       dp    *measurev1.DataPoint
+       idp   *measurev1.InternalDataPoint
        value int64
 }
 
 // NewTopElement returns a TopElement.
-func NewTopElement(dp *measurev1.DataPoint, value int64) TopElement {
+func NewTopElement(idp *measurev1.InternalDataPoint, value int64) TopElement {
        return TopElement{
-               dp:    dp,
+               idp:   idp,
                value: value,
        }
 }
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go 
b/pkg/query/logical/measure/topn_plan_localscan.go
index be85e760..418577e4 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -177,7 +177,7 @@ func (i *localScan) Schema() logical.Schema {
 type topNMIterator struct {
        result  model.MeasureQueryResult
        err     error
-       current []*measurev1.DataPoint
+       current []*measurev1.InternalDataPoint
 }
 
 func (ei *topNMIterator) Next() bool {
@@ -213,6 +213,10 @@ func (ei *topNMIterator) Next() bool {
                        ei.err = multierr.Append(ei.err, 
errors.WithMessagef(err, "failed to unmarshal topN values[%d]:[%s]%s", i, ts, 
hex.EncodeToString(fv.GetBinaryData())))
                        continue
                }
+               shardID := uint32(0)
+               if i < len(r.ShardIDs) {
+                       shardID = uint32(r.ShardIDs[i])
+               }
                fieldName, entityNames, values, entities := topNValue.Values()
                for j := range entities {
                        dp := &measurev1.DataPoint{
@@ -240,13 +244,13 @@ func (ei *topNMIterator) Next() bool {
                                        },
                                },
                        })
-                       ei.current = append(ei.current, dp)
+                       ei.current = append(ei.current, 
&measurev1.InternalDataPoint{DataPoint: dp, ShardId: shardID})
                }
        }
        return true
 }
 
-func (ei *topNMIterator) Current() []*measurev1.DataPoint {
+func (ei *topNMIterator) Current() []*measurev1.InternalDataPoint {
        return ei.current
 }
 
diff --git a/pkg/query/logical/measure/topn_plan_merge.go 
b/pkg/query/logical/measure/topn_plan_merge.go
index 62bce81d..bb31a99e 100644
--- a/pkg/query/logical/measure/topn_plan_merge.go
+++ b/pkg/query/logical/measure/topn_plan_merge.go
@@ -97,7 +97,7 @@ func (t *topNMerger) Schema() logical.Schema {
 
 type topNMergingIterator struct {
        iters      []executor.MIterator
-       currentDps []*measurev1.DataPoint
+       currentDps []*measurev1.InternalDataPoint
        currentIt  int
 }
 
@@ -125,11 +125,11 @@ func (t *topNMergingIterator) Next() bool {
        return false
 }
 
-func (t *topNMergingIterator) Current() []*measurev1.DataPoint {
+func (t *topNMergingIterator) Current() []*measurev1.InternalDataPoint {
        if len(t.currentDps) == 0 {
                return nil
        }
-       return []*measurev1.DataPoint{t.currentDps[0]}
+       return []*measurev1.InternalDataPoint{t.currentDps[0]}
 }
 
 func (t *topNMergingIterator) Close() error {
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 4aa313ea..246d8c29 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -73,6 +73,7 @@ type MeasureResult struct {
        Error       error
        Timestamps  []int64
        Versions    []int64
+       ShardIDs    []common.ShardID
        TagFamilies []TagFamily
        Fields      []Field
        SID         common.SeriesID

Reply via email to