This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ff8bbe9a refactor deduplicateAggregatedDataPoints (#943)
ff8bbe9a is described below
commit ff8bbe9af2387f75f64d7ee9a9674305a1404ad7
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Jan 20 21:15:49 2026 +0800
refactor deduplicateAggregatedDataPoints (#943)
---------
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
api/data/data.go | 7 +
api/data/measure.go | 10 +
api/proto/banyandb/measure/v1/query.proto | 25 ++
api/proto/banyandb/measure/v1/rpc.proto | 4 +
banyand/dquery/measure.go | 2 +-
banyand/internal/storage/segment.go | 13 +
banyand/internal/storage/storage.go | 1 +
banyand/measure/block.go | 5 +
banyand/measure/cache_benchmark_test.go | 2 +-
banyand/measure/part.go | 1 +
banyand/measure/query.go | 17 +-
banyand/measure/query_test.go | 7 +-
banyand/query/processor.go | 341 ++++++++++++++++-----
banyand/query/processor_topn.go | 2 +-
banyand/query/query.go | 7 +
docs/api-reference.md | 54 ++++
pkg/query/executor/interface.go | 2 +-
pkg/query/logical/measure/measure_plan.go | 2 +-
.../logical/measure/measure_plan_aggregation.go | 17 +-
.../logical/measure/measure_plan_distributed.go | 137 +++++----
.../measure/measure_plan_distributed_test.go | 184 +++++++++--
pkg/query/logical/measure/measure_plan_groupby.go | 32 +-
.../measure/measure_plan_indexscan_local.go | 16 +-
pkg/query/logical/measure/measure_plan_top.go | 10 +-
pkg/query/logical/measure/measure_top.go | 6 +-
pkg/query/logical/measure/topn_plan_localscan.go | 10 +-
pkg/query/logical/measure/topn_plan_merge.go | 6 +-
pkg/query/model/model.go | 1 +
28 files changed, 713 insertions(+), 208 deletions(-)
diff --git a/api/data/data.go b/api/data/data.go
index 956c9e33..61c97897 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -35,6 +35,7 @@ var (
TopicStreamQuery.String(): TopicStreamQuery,
TopicMeasureWrite.String(): TopicMeasureWrite,
TopicMeasureQuery.String(): TopicMeasureQuery,
+ TopicInternalMeasureQuery.String():
TopicInternalMeasureQuery,
TopicTopNQuery.String(): TopicTopNQuery,
TopicPropertyDelete.String(): TopicPropertyDelete,
TopicPropertyQuery.String(): TopicPropertyQuery,
@@ -71,6 +72,9 @@ var (
TopicMeasureQuery: func() proto.Message {
return &measurev1.QueryRequest{}
},
+ TopicInternalMeasureQuery: func() proto.Message {
+ return &measurev1.InternalQueryRequest{}
+ },
TopicTopNQuery: func() proto.Message {
return &measurev1.TopNRequest{}
},
@@ -139,6 +143,9 @@ var (
TopicMeasureQuery: func() proto.Message {
return &measurev1.QueryResponse{}
},
+ TopicInternalMeasureQuery: func() proto.Message {
+ return &measurev1.InternalQueryResponse{}
+ },
TopicTopNQuery: func() proto.Message {
return &measurev1.TopNResponse{}
},
diff --git a/api/data/measure.go b/api/data/measure.go
index ea9eb8de..2bf2bc4d 100644
--- a/api/data/measure.go
+++ b/api/data/measure.go
@@ -40,6 +40,16 @@ var MeasureQueryKindVersion = common.KindVersion{
// TopicMeasureQuery is the measure query topic.
var TopicMeasureQuery = bus.BiTopic(MeasureQueryKindVersion.String())
+// InternalMeasureQueryKindVersion is the version tag of internal measure
query kind.
+var InternalMeasureQueryKindVersion = common.KindVersion{
+ Version: "v1",
+ Kind: "internal-measure-query",
+}
+
+// TopicInternalMeasureQuery is the internal measure query topic.
+// Used for distributed query with shard information.
+var TopicInternalMeasureQuery =
bus.BiTopic(InternalMeasureQueryKindVersion.String())
+
// TopNQueryKindVersion is the version tag of top-n query kind.
var TopNQueryKindVersion = common.KindVersion{
Version: "v1",
diff --git a/api/proto/banyandb/measure/v1/query.proto
b/api/proto/banyandb/measure/v1/query.proto
index 9bd0f045..a3d10adb 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -55,6 +55,31 @@ message QueryResponse {
common.v1.Trace trace = 2;
}
+// InternalDataPoint wraps DataPoint with shard information for internal use.
+// Used in distributed query to distinguish data from different shards.
+message InternalDataPoint {
+ // The actual data point
+ DataPoint data_point = 1;
+ // The shard id where this data point comes from
+ uint32 shard_id = 2;
+}
+
+// InternalQueryRequest is the internal request for distributed query.
+// Wraps QueryRequest for extensibility.
+message InternalQueryRequest {
+ // The actual query request
+ QueryRequest request = 1;
+}
+
+// InternalQueryResponse is the internal response for distributed query.
+// Contains shard information for proper deduplication.
+message InternalQueryResponse {
+ // data_points with shard information
+ repeated InternalDataPoint data_points = 1;
+ // trace contains the trace information of the query when trace is enabled
+ common.v1.Trace trace = 2;
+}
+
// QueryRequest is the request contract for query.
message QueryRequest {
// groups indicate where the data points are stored.
diff --git a/api/proto/banyandb/measure/v1/rpc.proto
b/api/proto/banyandb/measure/v1/rpc.proto
index 0f1d1bf5..5f547384 100644
--- a/api/proto/banyandb/measure/v1/rpc.proto
+++ b/api/proto/banyandb/measure/v1/rpc.proto
@@ -46,6 +46,10 @@ service MeasureService {
};
}
+ // InternalQuery is used for internal distributed query between liaison and
data nodes.
+ // Returns InternalQueryResponse with shard information for proper
deduplication.
+ rpc InternalQuery(InternalQueryRequest) returns (InternalQueryResponse);
+
rpc Write(stream WriteRequest) returns (stream WriteResponse);
rpc TopN(TopNRequest) returns (TopNResponse) {
option (google.api.http) = {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index cbe8fe3e..3bfaaea4 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -160,7 +160,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context,
message bus.Message) (r
r++
current := mIterator.Current()
if len(current) > 0 {
- result = append(result, current[0])
+ result = append(result,
current[0].GetDataPoint())
}
}
}()
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 08408aca..74dff151 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -141,6 +141,19 @@ func (s *segment[T, O]) Tables() (tt []T, cc []Cache) {
return tt, cc
}
+// TablesWithShardIDs returns tables with their corresponding shard IDs.
+func (s *segment[T, O]) TablesWithShardIDs() (tt []T, shardIDs
[]common.ShardID, cc []Cache) {
+ sLst := s.sLst.Load()
+ if sLst != nil {
+ for _, sh := range *sLst {
+ tt = append(tt, sh.table)
+ shardIDs = append(shardIDs, sh.id)
+ cc = append(cc, sh.shardCache)
+ }
+ }
+ return tt, shardIDs, cc
+}
+
func (s *segment[T, O]) incRef(ctx context.Context) error {
s.lastAccessed.Store(time.Now().UnixNano())
if atomic.LoadInt32(&s.refCount) <= 0 {
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index e4cb4d5b..5ac1b883 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -128,6 +128,7 @@ type Segment[T TSTable, O any] interface {
GetTimeRange() timestamp.TimeRange
CreateTSTableIfNotExist(shardID common.ShardID) (T, error)
Tables() ([]T, []Cache)
+ TablesWithShardIDs() ([]T, []common.ShardID, []Cache)
Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList,
error)
IndexDB() IndexDB
}
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 24c9cd48..6cffb728 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -445,6 +445,7 @@ type blockCursor struct {
idx int
minTimestamp int64
maxTimestamp int64
+ shardID common.ShardID
}
func (bc *blockCursor) reset() {
@@ -497,6 +498,9 @@ func (bc *blockCursor) copyAllTo(r *model.MeasureResult,
storedIndexValue map[co
r.SID = bc.bm.seriesID
r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...)
r.Versions = append(r.Versions, bc.versions[idx:offset]...)
+ for i := 0; i < size; i++ {
+ r.ShardIDs = append(r.ShardIDs, bc.shardID)
+ }
if desc {
slices.Reverse(r.Timestamps)
slices.Reverse(r.Versions)
@@ -595,6 +599,7 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult,
storedIndexValue map[commo
r.SID = bc.bm.seriesID
r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx])
r.Versions = append(r.Versions, bc.versions[bc.idx])
+ r.ShardIDs = append(r.ShardIDs, bc.shardID)
var indexValue map[string]*modelv1.TagValue
if storedIndexValue != nil {
indexValue = storedIndexValue[r.SID]
diff --git a/banyand/measure/cache_benchmark_test.go
b/banyand/measure/cache_benchmark_test.go
index 12c2f368..b8052cfa 100644
--- a/banyand/measure/cache_benchmark_test.go
+++ b/banyand/measure/cache_benchmark_test.go
@@ -330,7 +330,7 @@ func (m *measure) QueryWithCache(ctx context.Context, mqo
model.MeasureQueryOpti
}
}
- sids, tables, _, storedIndexValue, newTagProjection, err :=
m.searchSeriesList(ctx, series, mqo, segments)
+ sids, tables, _, _, storedIndexValue, newTagProjection, err :=
m.searchSeriesList(ctx, series, mqo, segments)
if err != nil {
return nil, err
}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 7cb29232..844713fb 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -66,6 +66,7 @@ type part struct {
path string
primaryBlockMetadata []primaryBlockMetadata
partMetadata partMetadata
+ shardID common.ShardID
}
func (p *part) close() {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 22fa8aa4..495e0304 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -120,7 +120,7 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
}
}
- sids, tables, caches, storedIndexValue, newTagProjection, err :=
m.searchSeriesList(ctx, series, mqo, segments)
+ sids, tables, tableShardIDs, caches, storedIndexValue,
newTagProjection, err := m.searchSeriesList(ctx, series, mqo, segments)
if err != nil {
return nil, err
}
@@ -166,11 +166,16 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
if s == nil {
continue
}
+ oldLen := len(parts)
parts, n = s.getParts(parts, caches[i], qo.minTimestamp,
qo.maxTimestamp)
if n < 1 {
s.decRef()
continue
}
+ // Set shard ID for newly added parts
+ for j := oldLen; j < len(parts); j++ {
+ parts[j].shardID = tableShardIDs[i]
+ }
result.snapshots = append(result.snapshots, s)
}
@@ -212,7 +217,7 @@ type tagNameWithType struct {
func (m *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series,
mqo model.MeasureQueryOptions,
segments []storage.Segment[*tsTable, option],
-) (sl []common.SeriesID, tables []*tsTable, caches []storage.Cache,
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) (sl []common.SeriesID, tables []*tsTable, tableShardIDs []common.ShardID,
caches []storage.Cache, storedIndexValue
map[common.SeriesID]map[string]*modelv1.TagValue,
newTagProjection []model.TagProjection, err error,
) {
var indexProjection []index.FieldKey
@@ -266,11 +271,12 @@ func (m *measure) searchSeriesList(ctx context.Context,
series []*pbv1.Series, m
Projection: indexProjection,
})
if err != nil {
- return nil, nil, nil, nil, nil, err
+ return nil, nil, nil, nil, nil, nil, err
}
if len(sd.SeriesList) > 0 {
- tt, cc := segments[i].Tables()
+ tt, shardIDs, cc := segments[i].TablesWithShardIDs()
tables = append(tables, tt...)
+ tableShardIDs = append(tableShardIDs, shardIDs...)
caches = append(caches, cc...)
// Create segResult for this segment
@@ -360,7 +366,7 @@ func (m *measure) searchSeriesList(ctx context.Context,
series []*pbv1.Series, m
}
}
- return sl, tables, caches, storedIndexValue, newTagProjection, nil
+ return sl, tables, tableShardIDs, caches, storedIndexValue,
newTagProjection, nil
}
func (m *measure) buildStoredIndexValue(
@@ -515,6 +521,7 @@ func (m *measure) searchBlocks(ctx context.Context, result
*queryResult, sids []
bc := generateBlockCursor()
p := tstIter.piHeap[0]
bc.init(p.p, p.curBlock, qo)
+ bc.shardID = p.p.shardID
result.data = append(result.data, bc)
totalBlockBytes += bc.bm.uncompressedSizeBytes
if quota >= 0 && totalBlockBytes > uint64(quota) {
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 18851e8d..66ab6077 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
@@ -1295,7 +1296,8 @@ func TestQueryResult(t *testing.T) {
}
if diff := cmp.Diff(got, tt.want,
- protocmp.IgnoreUnknown(),
protocmp.Transform()); diff != "" {
+ protocmp.IgnoreUnknown(),
protocmp.Transform(),
+
cmpopts.IgnoreFields(model.MeasureResult{}, "ShardIDs")); diff != "" {
t.Errorf("Unexpected []pbv1.Result
(-got +want):\n%s", diff)
}
}
@@ -1512,7 +1514,8 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
got = append(got, *r)
}
if diff := cmp.Diff(got, tt.want,
- protocmp.IgnoreUnknown(),
protocmp.Transform()); diff != "" {
+ protocmp.IgnoreUnknown(), protocmp.Transform(),
+ cmpopts.IgnoreFields(model.MeasureResult{},
"ShardIDs")); diff != "" {
t.Errorf("Unexpected []pbv1.Result (-got
+want):\n%s", diff)
}
})
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 388ac0b5..cbe7d783 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -54,6 +54,7 @@ const (
var (
_ bus.MessageListener = (*streamQueryProcessor)(nil)
_ bus.MessageListener = (*measureQueryProcessor)(nil)
+ _ bus.MessageListener = (*measureInternalQueryProcessor)(nil)
_ bus.MessageListener = (*traceQueryProcessor)(nil)
_ executor.TraceExecutionContext = trace.Trace(nil)
)
@@ -161,6 +162,129 @@ type measureQueryProcessor struct {
*bus.UnImplementedHealthyListener
}
+// measureExecutionContext holds the common execution context for measure
queries.
+type measureExecutionContext struct {
+ ml *logger.Logger
+ ecc []executor.MeasureExecutionContext
+ metadata []*commonv1.Metadata
+ schemas []logical.Schema
+}
+
+// buildMeasureContext builds the execution context for a measure query.
+func buildMeasureContext(measureService measure.Service, log *logger.Logger,
queryCriteria *measurev1.QueryRequest, logPrefix string)
(*measureExecutionContext, error) {
+ var metadata []*commonv1.Metadata
+ var schemas []logical.Schema
+ var ecc []executor.MeasureExecutionContext
+ for i := range queryCriteria.Groups {
+ meta := &commonv1.Metadata{
+ Name: queryCriteria.Name,
+ Group: queryCriteria.Groups[i],
+ }
+ ec, ecErr := measureService.Measure(meta)
+ if ecErr != nil {
+ return nil, fmt.Errorf("fail to get execution context
for measure %s: %w", meta.GetName(), ecErr)
+ }
+ s, schemaErr := logical_measure.BuildSchema(ec.GetSchema(),
ec.GetIndexRules())
+ if schemaErr != nil {
+ return nil, fmt.Errorf("fail to build schema for
measure %s: %w", meta.GetName(), schemaErr)
+ }
+ ecc = append(ecc, ec)
+ schemas = append(schemas, s)
+ metadata = append(metadata, meta)
+ }
+ ml := log.Named(logPrefix, queryCriteria.Groups[0], queryCriteria.Name)
+ return &measureExecutionContext{
+ metadata: metadata,
+ schemas: schemas,
+ ecc: ecc,
+ ml: ml,
+ }, nil
+}
+
+// executeMeasurePlan executes the measure query plan and returns the iterator.
+func executeMeasurePlan(ctx context.Context, queryCriteria
*measurev1.QueryRequest, mctx *measureExecutionContext) (executor.MIterator,
logical.Plan, error) {
+ plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata,
mctx.schemas, mctx.ecc)
+ if planErr != nil {
+ return nil, nil, fmt.Errorf("fail to analyze the query request
for measure %s: %w", queryCriteria.GetName(), planErr)
+ }
+ if e := mctx.ml.Debug(); e.Enabled() {
+ e.Str("plan", plan.String()).Msg("query plan")
+ }
+ mIterator, execErr := plan.(executor.MeasureExecutable).Execute(ctx)
+ if execErr != nil {
+ mctx.ml.Error().Err(execErr).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to query")
+ return nil, nil, fmt.Errorf("fail to execute the query plan for
measure %s: %w", queryCriteria.GetName(), execErr)
+ }
+ return mIterator, plan, nil
+}
+
+// extractTagValuesFromInternalDataPoints extracts tag values from
InternalDataPoints for RewriteAggTopNResult.
+func extractTagValuesFromInternalDataPoints(dataPoints
[]*measurev1.InternalDataPoint, groupByTags []string)
map[string][]*modelv1.TagValue {
+ tagValueMap := make(map[string][]*modelv1.TagValue)
+ for _, idp := range dataPoints {
+ if idp.DataPoint != nil {
+ extractTagValuesFromDataPoint(idp.DataPoint,
groupByTags, tagValueMap)
+ }
+ }
+ return tagValueMap
+}
+
+// collectInternalDataPoints collects InternalDataPoints from the iterator.
+func collectInternalDataPoints(mIterator executor.MIterator)
[]*measurev1.InternalDataPoint {
+ result := make([]*measurev1.InternalDataPoint, 0)
+ for mIterator.Next() {
+ current := mIterator.Current()
+ if len(current) > 0 {
+ result = append(result, current[0])
+ }
+ }
+ return result
+}
+
+// extractTagValuesFromDataPoints extracts tag values from DataPoints for
RewriteAggTopNResult.
+func extractTagValuesFromDataPoints(dataPoints []*measurev1.DataPoint,
groupByTags []string) map[string][]*modelv1.TagValue {
+ tagValueMap := make(map[string][]*modelv1.TagValue)
+ for _, dp := range dataPoints {
+ extractTagValuesFromDataPoint(dp, groupByTags, tagValueMap)
+ }
+ return tagValueMap
+}
+
+// extractTagValuesFromDataPoint extracts tag values from a single DataPoint
and appends to tagValueMap.
+func extractTagValuesFromDataPoint(dp *measurev1.DataPoint, groupByTags
[]string, tagValueMap map[string][]*modelv1.TagValue) {
+ for _, tagFamily := range dp.GetTagFamilies() {
+ for _, tag := range tagFamily.GetTags() {
+ tagName := tag.GetKey()
+ if len(groupByTags) == 0 ||
slices.Contains(groupByTags, tagName) {
+ tagValueMap[tagName] =
append(tagValueMap[tagName], tag.GetValue())
+ }
+ }
+ }
+}
+
+// getGroupByTags extracts group by tag names from query criteria.
+func getGroupByTags(queryCriteria *measurev1.QueryRequest) []string {
+ groupByTags := make([]string, 0)
+ if queryCriteria.GetGroupBy() != nil {
+ for _, tagFamily := range
queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() {
+ groupByTags = append(groupByTags,
tagFamily.GetTags()...)
+ }
+ }
+ return groupByTags
+}
+
+// buildRewriteQueryCriteria builds the rewrite query criteria for
RewriteAggTopNResult.
+func buildRewriteQueryCriteria(queryCriteria *measurev1.QueryRequest,
rewrittenCriteria *modelv1.Criteria) *measurev1.QueryRequest {
+ return &measurev1.QueryRequest{
+ Groups: queryCriteria.Groups,
+ Name: queryCriteria.Name,
+ TimeRange: queryCriteria.TimeRange,
+ Criteria: rewrittenCriteria,
+ TagProjection: queryCriteria.TagProjection,
+ FieldProjection: queryCriteria.FieldProjection,
+ }
+}
+
func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message)
(resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
n := time.Now()
@@ -182,36 +306,14 @@ func (p *measureQueryProcessor) Rev(ctx context.Context,
message bus.Message) (r
if len(result) == 0 {
return
}
- groupByTags := make([]string, 0)
- if queryCriteria.GetGroupBy() != nil {
- for _, tagFamily := range
queryCriteria.GetGroupBy().GetTagProjection().GetTagFamilies() {
- groupByTags = append(groupByTags,
tagFamily.GetTags()...)
- }
- }
- tagValueMap := make(map[string][]*modelv1.TagValue)
- for _, dp := range result {
- for _, tagFamily := range dp.GetTagFamilies() {
- for _, tag := range tagFamily.GetTags() {
- tagName := tag.GetKey()
- if len(groupByTags) == 0 ||
slices.Contains(groupByTags, tagName) {
- tagValueMap[tagName] =
append(tagValueMap[tagName], tag.GetValue())
- }
- }
- }
- }
- rewriteCriteria, err := rewriteCriteria(tagValueMap)
- if err != nil {
- p.log.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
+ groupByTags := getGroupByTags(queryCriteria)
+ tagValueMap := extractTagValuesFromDataPoints(result,
groupByTags)
+ rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
+ if rewriteErr != nil {
+ p.log.Error().Err(rewriteErr).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
return
}
- rewriteQueryCriteria := &measurev1.QueryRequest{
- Groups: queryCriteria.Groups,
- Name: queryCriteria.Name,
- TimeRange: queryCriteria.TimeRange,
- Criteria: rewriteCriteria,
- TagProjection: queryCriteria.TagProjection,
- FieldProjection: queryCriteria.FieldProjection,
- }
+ rewriteQueryCriteria :=
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
resp = p.executeQuery(ctx, rewriteQueryCriteria)
dataPoints, handleErr := handleResponse(resp)
if handleErr != nil {
@@ -226,43 +328,32 @@ func (p *measureQueryProcessor) executeQuery(ctx
context.Context, queryCriteria
n := time.Now()
now := n.UnixNano()
defer func() {
- if err := recover(); err != nil {
- p.log.Error().Interface("err", err).RawJSON("req",
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+ if recoverErr := recover(); recoverErr != nil {
+ p.log.Error().Interface("err",
recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack",
string(debug.Stack())).Msg("panic")
resp =
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
}
}()
- var metadata []*commonv1.Metadata
- var schemas []logical.Schema
- var ecc []executor.MeasureExecutionContext
- for i := range queryCriteria.Groups {
- meta := &commonv1.Metadata{
- Name: queryCriteria.Name,
- Group: queryCriteria.Groups[i],
- }
- ec, err := p.measureService.Measure(meta)
- if err != nil {
- resp = bus.NewMessage(bus.MessageID(now),
common.NewError("fail to get execution context for measure %s: %v",
meta.GetName(), err))
- return
- }
- s, err := logical_measure.BuildSchema(ec.GetSchema(),
ec.GetIndexRules())
- if err != nil {
- resp = bus.NewMessage(bus.MessageID(now),
common.NewError("fail to build schema for measure %s: %v", meta.GetName(), err))
- return
- }
- ecc = append(ecc, ec)
- schemas = append(schemas, s)
- metadata = append(metadata, meta)
+
+ mctx, buildErr := buildMeasureContext(p.measureService, p.log,
queryCriteria, "measure")
+ if buildErr != nil {
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v",
buildErr))
+ return
}
- ml := p.log.Named("measure", queryCriteria.Groups[0],
queryCriteria.Name)
- if e := ml.Debug(); e.Enabled() {
+ if e := mctx.ml.Debug(); e.Enabled() {
e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a
query event")
}
- plan, err := logical_measure.Analyze(queryCriteria, metadata, schemas,
ecc)
- if err != nil {
- resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for measure %s: %v", queryCriteria.GetName(), err))
+ mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+ if execErr != nil {
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v",
execErr))
return
}
+ defer func() {
+ if closeErr := mIterator.Close(); closeErr != nil {
+ mctx.ml.Error().Err(closeErr).Dur("latency",
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close
the query plan")
+ }
+ }()
+
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
@@ -286,26 +377,7 @@ func (p *measureQueryProcessor) executeQuery(ctx
context.Context, queryCriteria
}()
}
- if e := ml.Debug(); e.Enabled() {
- e.Str("plan", plan.String()).Msg("query plan")
- }
-
- mIterator, err := plan.(executor.MeasureExecutable).Execute(ctx)
- if err != nil {
- ml.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to query")
- resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to execute the query plan for measure %s: %v", queryCriteria.GetName(), err))
- return
- }
- defer func() {
- if err = mIterator.Close(); err != nil {
- ml.Error().Err(err).Dur("latency",
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close
the query plan")
- if span != nil {
- span.Error(fmt.Errorf("fail to close the query
plan: %w", err))
- }
- }
- }()
-
- result := make([]*measurev1.DataPoint, 0)
+ var result []*measurev1.DataPoint
func() {
var r int
if tracer != nil {
@@ -320,12 +392,13 @@ func (p *measureQueryProcessor) executeQuery(ctx
context.Context, queryCriteria
r++
current := mIterator.Current()
if len(current) > 0 {
- result = append(result, current[0])
+ result = append(result,
current[0].GetDataPoint())
}
}
}()
+
qr := &measurev1.QueryResponse{DataPoints: result}
- if e := ml.Debug(); e.Enabled() {
+ if e := mctx.ml.Debug(); e.Enabled() {
e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
resp = bus.NewMessage(bus.MessageID(now), qr)
@@ -350,6 +423,118 @@ func handleResponse(resp bus.Message)
([]*measurev1.DataPoint, *common.Error) {
}
}
+type measureInternalQueryProcessor struct {
+ measureService measure.Service
+ *queryService
+ *bus.UnImplementedHealthyListener
+}
+
+func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message
bus.Message) (resp bus.Message) {
+ internalRequest, ok := message.Data().(*measurev1.InternalQueryRequest)
+ n := time.Now()
+ now := n.UnixNano()
+ if !ok {
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
+ return
+ }
+ queryCriteria := internalRequest.GetRequest()
+ if queryCriteria == nil {
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("query request is nil"))
+ return
+ }
+ // Handle RewriteAggTopNResult: double the top number for initial query
+ if queryCriteria.RewriteAggTopNResult {
+ queryCriteria.Top.Number *= 2
+ }
+ defer func() {
+ if recoverErr := recover(); recoverErr != nil {
+ p.log.Error().Interface("err",
recoverErr).RawJSON("req", logger.Proto(queryCriteria)).Str("stack",
string(debug.Stack())).Msg("panic")
+ resp =
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+ }
+ }()
+
+ mctx, buildErr := buildMeasureContext(p.measureService, p.log,
queryCriteria, "internal-measure")
+ if buildErr != nil {
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v",
buildErr))
+ return
+ }
+ if e := mctx.ml.Debug(); e.Enabled() {
+ e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received an
internal query event")
+ }
+
+ mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+ if execErr != nil {
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v",
execErr))
+ return
+ }
+ defer func() {
+ if closeErr := mIterator.Close(); closeErr != nil {
+ mctx.ml.Error().Err(closeErr).Dur("latency",
time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close
the query plan")
+ }
+ }()
+
+ var tracer *query.Tracer
+ var span *query.Span
+ if queryCriteria.Trace {
+ tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+ span, ctx = tracer.StartSpan(ctx, "data-%s",
p.queryService.nodeID)
+ span.Tag("plan", plan.String())
+ defer func() {
+ respData := resp.Data()
+ switch d := respData.(type) {
+ case *measurev1.InternalQueryResponse:
+ span.Tag("resp_count", fmt.Sprintf("%d",
len(d.DataPoints)))
+ d.Trace = tracer.ToProto()
+ span.Stop()
+ case *common.Error:
+ span.Error(errors.New(d.Error()))
+ span.Stop()
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.InternalQueryResponse{Trace: tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ }()
+ }
+
+ result := collectInternalDataPoints(mIterator)
+
+ // Handle RewriteAggTopNResult: rewrite query to get original data with
Timestamp
+ if queryCriteria.RewriteAggTopNResult && len(result) > 0 {
+ groupByTags := getGroupByTags(queryCriteria)
+ tagValueMap := extractTagValuesFromInternalDataPoints(result,
groupByTags)
+ rewrittenCriteria, rewriteErr := rewriteCriteria(tagValueMap)
+ if rewriteErr != nil {
+ mctx.ml.Error().Err(rewriteErr).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
+ } else {
+ rewriteQueryCriteria :=
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
+ rewriteIterator, _, rewriteExecErr :=
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx)
+ if rewriteExecErr != nil {
+
mctx.ml.Error().Err(rewriteExecErr).RawJSON("req",
logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query
plan")
+ } else {
+ defer func() {
+ if closeErr := rewriteIterator.Close();
closeErr != nil {
+
mctx.ml.Error().Err(closeErr).Msg("fail to close the rewrite query plan")
+ }
+ }()
+ result =
collectInternalDataPoints(rewriteIterator)
+ }
+ }
+ }
+
+ qr := &measurev1.InternalQueryResponse{DataPoints: result}
+ if e := mctx.ml.Debug(); e.Enabled() {
+ e.RawJSON("ret", logger.Proto(qr)).Msg("got an internal measure
response")
+ }
+ resp = bus.NewMessage(bus.MessageID(now), qr)
+ if !queryCriteria.Trace && p.slowQuery > 0 {
+ latency := time.Since(n)
+ if latency > p.slowQuery {
+ p.log.Warn().Dur("latency", latency).RawJSON("req",
logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("internal
measure slow query")
+ }
+ }
+ return
+}
+
func rewriteCriteria(tagValueMap map[string][]*modelv1.TagValue)
(*modelv1.Criteria, error) {
var tagConditions []*modelv1.Condition
for tagName, tagValues := range tagValueMap {
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 90a49bbc..33556434 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -176,7 +176,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
r++
current := mIterator.Current()
if len(current) > 0 {
- result = append(result, current[0])
+ result = append(result,
current[0].GetDataPoint())
}
}
}()
diff --git a/banyand/query/query.go b/banyand/query/query.go
index b5e1451a..4985f47e 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -42,6 +42,7 @@ type queryService struct {
log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
+ imqp *measureInternalQueryProcessor
nqp *topNQueryProcessor
tqp *traceQueryProcessor
nodeID string
@@ -61,6 +62,11 @@ func NewService(_ context.Context, streamService
stream.Service, measureService
measureService: measureService,
queryService: svc,
}
+ // internal measure query processor for distributed query
+ svc.imqp = &measureInternalQueryProcessor{
+ measureService: measureService,
+ queryService: svc,
+ }
// stream query processor
svc.sqp = &streamQueryProcessor{
streamService: streamService,
@@ -94,6 +100,7 @@ func (q *queryService) PreRun(ctx context.Context) error {
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
+ q.pipeline.Subscribe(data.TopicInternalMeasureQuery, q.imqp),
q.pipeline.Subscribe(data.TopicTopNQuery, q.nqp),
q.pipeline.Subscribe(data.TopicTraceQuery, q.tqp),
)
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 21081768..814da7f7 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -40,6 +40,9 @@
- [banyandb/measure/v1/query.proto](#banyandb_measure_v1_query-proto)
- [DataPoint](#banyandb-measure-v1-DataPoint)
- [DataPoint.Field](#banyandb-measure-v1-DataPoint-Field)
+ - [InternalDataPoint](#banyandb-measure-v1-InternalDataPoint)
+ - [InternalQueryRequest](#banyandb-measure-v1-InternalQueryRequest)
+ - [InternalQueryResponse](#banyandb-measure-v1-InternalQueryResponse)
- [QueryRequest](#banyandb-measure-v1-QueryRequest)
- [QueryRequest.Aggregation](#banyandb-measure-v1-QueryRequest-Aggregation)
-
[QueryRequest.FieldProjection](#banyandb-measure-v1-QueryRequest-FieldProjection)
@@ -907,6 +910,56 @@ DataPoint is stored in Measures
+<a name="banyandb-measure-v1-InternalDataPoint"></a>
+
+### InternalDataPoint
+InternalDataPoint wraps DataPoint with shard information for internal use.
+Used in distributed query to distinguish data from different shards.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| data_point | [DataPoint](#banyandb-measure-v1-DataPoint) | | The actual
data point |
+| shard_id | [uint32](#uint32) | | The shard id where this data point comes
from |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-InternalQueryRequest"></a>
+
+### InternalQueryRequest
+InternalQueryRequest is the internal request for distributed query.
+Wraps QueryRequest for extensibility.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| request | [QueryRequest](#banyandb-measure-v1-QueryRequest) | | The actual
query request |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-InternalQueryResponse"></a>
+
+### InternalQueryResponse
+InternalQueryResponse is the internal response for distributed query.
+Contains shard information for proper deduplication.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| data_points | [InternalDataPoint](#banyandb-measure-v1-InternalDataPoint) |
repeated | data_points with shard information |
+| trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) | | trace
contains the trace information of the query when trace is enabled |
+
+
+
+
+
+
<a name="banyandb-measure-v1-QueryRequest"></a>
### QueryRequest
@@ -4962,6 +5015,7 @@ WriteResponse is the response contract for write
| Method Name | Request Type | Response Type | Description |
| ----------- | ------------ | ------------- | ------------|
| Query | [QueryRequest](#banyandb-measure-v1-QueryRequest) |
[QueryResponse](#banyandb-measure-v1-QueryResponse) | |
+| InternalQuery |
[InternalQueryRequest](#banyandb-measure-v1-InternalQueryRequest) |
[InternalQueryResponse](#banyandb-measure-v1-InternalQueryResponse) |
InternalQuery is used for internal distributed query between liaison and data
nodes. Returns InternalQueryResponse with shard information for proper
deduplication. |
| Write | [WriteRequest](#banyandb-measure-v1-WriteRequest) stream |
[WriteResponse](#banyandb-measure-v1-WriteResponse) stream | |
| TopN | [TopNRequest](#banyandb-measure-v1-TopNRequest) |
[TopNResponse](#banyandb-measure-v1-TopNResponse) | |
| DeleteExpiredSegments |
[DeleteExpiredSegmentsRequest](#banyandb-measure-v1-DeleteExpiredSegmentsRequest)
|
[DeleteExpiredSegmentsResponse](#banyandb-measure-v1-DeleteExpiredSegmentsResponse)
| |
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 9953f538..5aef8b14 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -49,7 +49,7 @@ type MeasureExecutionContext interface {
type MIterator interface {
Next() bool
- Current() []*measurev1.DataPoint
+ Current() []*measurev1.InternalDataPoint
Close() error
}
diff --git a/pkg/query/logical/measure/measure_plan.go
b/pkg/query/logical/measure/measure_plan.go
index d9bc6a68..b6e6a0a3 100644
--- a/pkg/query/logical/measure/measure_plan.go
+++ b/pkg/query/logical/measure/measure_plan.go
@@ -98,7 +98,7 @@ func (l *limitIterator) Close() error {
return l.inner.Close()
}
-func (l *limitIterator) Current() []*measurev1.DataPoint {
+func (l *limitIterator) Current() []*measurev1.InternalDataPoint {
return l.inner.Current()
}
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go
b/pkg/query/logical/measure/measure_plan_aggregation.go
index e304119e..ec2b06a6 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -160,14 +160,16 @@ func (ami *aggGroupIterator[N]) Next() bool {
return ami.prev.Next()
}
-func (ami *aggGroupIterator[N]) Current() []*measurev1.DataPoint {
+func (ami *aggGroupIterator[N]) Current() []*measurev1.InternalDataPoint {
if ami.err != nil {
return nil
}
ami.aggrFunc.Reset()
group := ami.prev.Current()
var resultDp *measurev1.DataPoint
- for _, dp := range group {
+ var shardID uint32
+ for _, idp := range group {
+ dp := idp.GetDataPoint()
value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
GetValue()
v, err := aggregation.FromFieldValue[N](value)
@@ -179,6 +181,7 @@ func (ami *aggGroupIterator[N]) Current()
[]*measurev1.DataPoint {
if resultDp != nil {
continue
}
+ shardID = idp.GetShardId()
resultDp = &measurev1.DataPoint{
TagFamilies: dp.TagFamilies,
}
@@ -197,7 +200,7 @@ func (ami *aggGroupIterator[N]) Current()
[]*measurev1.DataPoint {
Value: val,
},
}
- return []*measurev1.DataPoint{resultDp}
+ return []*measurev1.InternalDataPoint{{DataPoint: resultDp, ShardId:
shardID}}
}
func (ami *aggGroupIterator[N]) Close() error {
@@ -232,7 +235,8 @@ func (ami *aggAllIterator[N]) Next() bool {
var resultDp *measurev1.DataPoint
for ami.prev.Next() {
group := ami.prev.Current()
- for _, dp := range group {
+ for _, idp := range group {
+ dp := idp.GetDataPoint()
value :=
dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
GetValue()
v, err := aggregation.FromFieldValue[N](value)
@@ -267,11 +271,12 @@ func (ami *aggAllIterator[N]) Next() bool {
return true
}
-func (ami *aggAllIterator[N]) Current() []*measurev1.DataPoint {
+func (ami *aggAllIterator[N]) Current() []*measurev1.InternalDataPoint {
if ami.result == nil {
return nil
}
- return []*measurev1.DataPoint{ami.result}
+ // For aggregation across all data, shard ID is not applicable
+ return []*measurev1.InternalDataPoint{{DataPoint: ami.result, ShardId:
0}}
}
func (ami *aggAllIterator[N]) Close() error {
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index e401b970..d81d3f68 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -27,6 +27,7 @@ import (
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
@@ -271,37 +272,38 @@ func (t *distributedPlan) Execute(ctx context.Context)
(mi executor.MIterator, e
}
}()
}
- ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
-
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos),
dctx.NodeSelectors(), dctx.TimeRange(), queryRequest))
- if err != nil {
- return nil, err
+ internalRequest := &measurev1.InternalQueryRequest{Request:
queryRequest}
+ ff, broadcastErr := dctx.Broadcast(defaultQueryTimeout,
data.TopicInternalMeasureQuery,
+
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos),
dctx.NodeSelectors(), dctx.TimeRange(), internalRequest))
+ if broadcastErr != nil {
+ return nil, broadcastErr
}
var see []sort.Iterator[*comparableDataPoint]
- var pushedDownAggDps []*measurev1.DataPoint
+ var pushedDownAggDps []*measurev1.InternalDataPoint
var responseCount int
var dataPointCount int
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
err = multierr.Append(err, getErr)
} else {
- d := m.Data()
- if d == nil {
- continue
- }
- resp := d.(*measurev1.QueryResponse)
- responseCount++
- if span != nil {
- span.AddSubTrace(resp.Trace)
- }
- if t.needCompletePushDownAgg {
- pushedDownAggDps = append(pushedDownAggDps,
resp.DataPoints...)
- dataPointCount += len(resp.DataPoints)
- continue
+ switch d := m.Data().(type) {
+ case *measurev1.InternalQueryResponse:
+ responseCount++
+ if span != nil {
+ span.AddSubTrace(d.Trace)
+ }
+ if t.needCompletePushDownAgg {
+ pushedDownAggDps =
append(pushedDownAggDps, d.DataPoints...)
+ dataPointCount += len(d.DataPoints)
+ continue
+ }
+ dataPointCount += len(d.DataPoints)
+ see = append(see,
+ newSortableElements(d.DataPoints,
+ t.sortByTime, t.sortTagSpec))
+ case *common.Error:
+ err = multierr.Append(err, fmt.Errorf("data
node error: %s", d.Error()))
}
- dataPointCount += len(resp.DataPoints)
- see = append(see,
- newSortableElements(resp.DataPoints,
- t.sortByTime, t.sortTagSpec))
}
}
if span != nil {
@@ -309,7 +311,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
span.Tagf("data_point_count", "%d", dataPointCount)
}
if t.needCompletePushDownAgg {
- deduplicatedDps, dedupErr :=
deduplicateAggregatedDataPoints(pushedDownAggDps, t.groupByTagsRefs)
+ deduplicatedDps, dedupErr :=
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
if dedupErr != nil {
return nil, multierr.Append(err, dedupErr)
}
@@ -347,25 +349,26 @@ func (t *distributedPlan) Limit(maxVal int) {
var _ sort.Comparable = (*comparableDataPoint)(nil)
type comparableDataPoint struct {
- *measurev1.DataPoint
+ *measurev1.InternalDataPoint
sortField []byte
}
-func newComparableElement(e *measurev1.DataPoint, sortByTime bool, sortTagSpec
logical.TagSpec) (*comparableDataPoint, error) {
+func newComparableElement(idp *measurev1.InternalDataPoint, sortByTime bool,
sortTagSpec logical.TagSpec) (*comparableDataPoint, error) {
+ dp := idp.GetDataPoint()
var sortField []byte
if sortByTime {
- sortField =
convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano()))
+ sortField =
convert.Uint64ToBytes(uint64(dp.Timestamp.AsTime().UnixNano()))
} else {
var err error
- sortField, err =
pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value)
+ sortField, err =
pbv1.MarshalTagValue(dp.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value)
if err != nil {
return nil, err
}
}
return &comparableDataPoint{
- DataPoint: e,
- sortField: sortField,
+ InternalDataPoint: idp,
+ sortField: sortField,
}, nil
}
@@ -377,13 +380,13 @@ var _ sort.Iterator[*comparableDataPoint] =
(*sortableElements)(nil)
type sortableElements struct {
cur *comparableDataPoint
- dataPoints []*measurev1.DataPoint
+ dataPoints []*measurev1.InternalDataPoint
sortTagSpec logical.TagSpec
index int
isSortByTime bool
}
-func newSortableElements(dataPoints []*measurev1.DataPoint, isSortByTime bool,
sortTagSpec logical.TagSpec) *sortableElements {
+func newSortableElements(dataPoints []*measurev1.InternalDataPoint,
isSortByTime bool, sortTagSpec logical.TagSpec) *sortableElements {
return &sortableElements{
dataPoints: dataPoints,
isSortByTime: isSortByTime,
@@ -396,8 +399,8 @@ func (*sortableElements) Close() error {
}
func (s *sortableElements) Next() bool {
- return s.iter(func(e *measurev1.DataPoint) (*comparableDataPoint,
error) {
- return newComparableElement(e, s.isSortByTime, s.sortTagSpec)
+ return s.iter(func(idp *measurev1.InternalDataPoint)
(*comparableDataPoint, error) {
+ return newComparableElement(idp, s.isSortByTime, s.sortTagSpec)
})
}
@@ -405,7 +408,7 @@ func (s *sortableElements) Val() *comparableDataPoint {
return s.cur
}
-func (s *sortableElements) iter(fn func(*measurev1.DataPoint)
(*comparableDataPoint, error)) bool {
+func (s *sortableElements) iter(fn func(*measurev1.InternalDataPoint)
(*comparableDataPoint, error)) bool {
if s.index >= len(s.dataPoints) {
return false
}
@@ -423,8 +426,8 @@ var _ executor.MIterator = (*sortedMIterator)(nil)
type sortedMIterator struct {
sort.Iterator[*comparableDataPoint]
data *list.List
- uniqueData map[uint64]*measurev1.DataPoint
- cur *measurev1.DataPoint
+ uniqueData map[uint64]*measurev1.InternalDataPoint
+ cur *measurev1.InternalDataPoint
initialized bool
closed bool
}
@@ -439,7 +442,7 @@ func (s *sortedMIterator) init() {
return
}
s.data = list.New()
- s.uniqueData = make(map[uint64]*measurev1.DataPoint)
+ s.uniqueData = make(map[uint64]*measurev1.InternalDataPoint)
s.loadDps()
}
@@ -453,9 +456,9 @@ func (s *sortedMIterator) Next() bool {
return false
}
}
- dp := s.data.Front()
- s.data.Remove(dp)
- s.cur = dp.Value.(*measurev1.DataPoint)
+ idp := s.data.Front()
+ s.data.Remove(idp)
+ s.cur = idp.Value.(*measurev1.InternalDataPoint)
return true
}
@@ -467,7 +470,7 @@ func (s *sortedMIterator) loadDps() {
delete(s.uniqueData, k)
}
first := s.Iterator.Val()
- s.uniqueData[hashDataPoint(first.DataPoint)] = first.DataPoint
+ s.uniqueData[hashDataPoint(first.GetDataPoint())] =
first.InternalDataPoint
for {
if !s.Iterator.Next() {
s.closed = true
@@ -475,13 +478,13 @@ func (s *sortedMIterator) loadDps() {
}
v := s.Iterator.Val()
if bytes.Equal(first.SortedField(), v.SortedField()) {
- key := hashDataPoint(v.DataPoint)
+ key := hashDataPoint(v.GetDataPoint())
if existed, ok := s.uniqueData[key]; ok {
- if v.DataPoint.Version > existed.Version {
- s.uniqueData[key] = v.DataPoint
+ if v.GetDataPoint().Version >
existed.GetDataPoint().Version {
+ s.uniqueData[key] = v.InternalDataPoint
}
} else {
- s.uniqueData[key] = v.DataPoint
+ s.uniqueData[key] = v.InternalDataPoint
}
} else {
break
@@ -492,8 +495,12 @@ func (s *sortedMIterator) loadDps() {
}
}
-func (s *sortedMIterator) Current() []*measurev1.DataPoint {
- return []*measurev1.DataPoint{s.cur}
+func (s *sortedMIterator) Current() []*measurev1.InternalDataPoint {
+ return []*measurev1.InternalDataPoint{s.cur}
+}
+
+func (s *sortedMIterator) Close() error {
+ return nil
}
const (
@@ -512,7 +519,7 @@ func hashDataPoint(dp *measurev1.DataPoint) uint64 {
}
type pushedDownAggregatedIterator struct {
- dataPoints []*measurev1.DataPoint
+ dataPoints []*measurev1.InternalDataPoint
index int
}
@@ -524,33 +531,47 @@ func (s *pushedDownAggregatedIterator) Next() bool {
return true
}
-func (s *pushedDownAggregatedIterator) Current() []*measurev1.DataPoint {
+func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.InternalDataPoint {
if s.index == 0 || s.index > len(s.dataPoints) {
return nil
}
- return []*measurev1.DataPoint{s.dataPoints[s.index-1]}
+ return []*measurev1.InternalDataPoint{s.dataPoints[s.index-1]}
}
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas.
-func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
+// deduplicateAggregatedDataPointsWithShard removes duplicate aggregated
results from multiple replicas
+// of the same shard, while preserving results from different shards.
+func deduplicateAggregatedDataPointsWithShard(dataPoints
[]*measurev1.InternalDataPoint, groupByTagsRefs [][]*logical.TagRef)
([]*measurev1.InternalDataPoint, error) {
if len(groupByTagsRefs) == 0 {
return dataPoints, nil
}
+ // key = hash(shard_id, group_key)
+ // Same shard with same group key will be deduplicated
+ // Different shards with same group key will be preserved
groupMap := make(map[uint64]struct{})
- result := make([]*measurev1.DataPoint, 0, len(dataPoints))
- for _, dp := range dataPoints {
- key, err := formatGroupByKey(dp, groupByTagsRefs)
- if err != nil {
- return nil, err
+ result := make([]*measurev1.InternalDataPoint, 0, len(dataPoints))
+ for _, idp := range dataPoints {
+ groupKey, keyErr := formatGroupByKey(idp.DataPoint,
groupByTagsRefs)
+ if keyErr != nil {
+ return nil, keyErr
}
+ // Include shard_id in key calculation
+ key := hashWithShard(uint64(idp.ShardId), groupKey)
if _, exists := groupMap[key]; !exists {
groupMap[key] = struct{}{}
- result = append(result, dp)
+ result = append(result, idp)
}
}
return result, nil
}
+
+// hashWithShard combines shard_id and group_key into a single hash.
+func hashWithShard(shardID, groupKey uint64) uint64 {
+ h := uint64(offset64)
+ h = (h ^ shardID) * prime64
+ h = (h ^ groupKey) * prime64
+ return h
+}
diff --git a/pkg/query/logical/measure/measure_plan_distributed_test.go
b/pkg/query/logical/measure/measure_plan_distributed_test.go
index e8e1dfb2..7468b4fd 100644
--- a/pkg/query/logical/measure/measure_plan_distributed_test.go
+++ b/pkg/query/logical/measure/measure_plan_distributed_test.go
@@ -25,7 +25,10 @@ import (
"google.golang.org/protobuf/testing/protocmp"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
type mockIterator struct {
@@ -46,6 +49,19 @@ func (m *mockIterator) Close() error {
return nil
}
+func makeComparableDP(sid uint64, seconds, nanos int64, version int64,
sortField byte) *comparableDataPoint {
+ return &comparableDataPoint{
+ InternalDataPoint: &measurev1.InternalDataPoint{
+ DataPoint: &measurev1.DataPoint{
+ Sid: sid,
+ Timestamp: ×tamppb.Timestamp{Seconds:
seconds, Nanos: int32(nanos)},
+ Version: version,
+ },
+ },
+ sortField: []byte{sortField},
+ }
+}
+
func TestSortedMIterator(t *testing.T) {
testCases := []struct {
name string
@@ -60,8 +76,8 @@ func TestSortedMIterator(t *testing.T) {
{
name: "all data points are the same",
data: []*comparableDataPoint{
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
+ makeComparableDP(1, 1, 1, 1, 1),
+ makeComparableDP(1, 1, 1, 1, 1),
},
want: []*measurev1.DataPoint{
{Sid: 1, Timestamp:
×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
@@ -70,8 +86,8 @@ func TestSortedMIterator(t *testing.T) {
{
name: "identical data points with different sort
fields",
data: []*comparableDataPoint{
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{2}},
+ makeComparableDP(1, 1, 1, 1, 1),
+ makeComparableDP(1, 1, 1, 1, 2),
},
want: []*measurev1.DataPoint{
{Sid: 1, Timestamp:
×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
@@ -81,8 +97,8 @@ func TestSortedMIterator(t *testing.T) {
{
name: "different data points with different sort
fields",
data: []*comparableDataPoint{
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1},
sortField: []byte{2}},
+ makeComparableDP(1, 1, 1, 1, 1),
+ makeComparableDP(1, 2, 2, 1, 2),
},
want: []*measurev1.DataPoint{
{Sid: 1, Timestamp:
×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
@@ -92,9 +108,9 @@ func TestSortedMIterator(t *testing.T) {
{
name: "identical data points with different versions",
data: []*comparableDataPoint{
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
sortField: []byte{1}},
+ makeComparableDP(1, 1, 1, 1, 1),
+ makeComparableDP(1, 2, 2, 1, 1),
+ makeComparableDP(1, 1, 1, 2, 1),
},
want: []*measurev1.DataPoint{
{Sid: 1, Timestamp:
×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
@@ -102,17 +118,17 @@ func TestSortedMIterator(t *testing.T) {
},
},
{
- name: "identical data points with different versions",
+ name: "multiple sids with different versions",
data: []*comparableDataPoint{
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 2,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 1},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
sortField: []byte{1}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2},
sortField: []byte{2}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 1},
sortField: []byte{2}},
- {DataPoint: &measurev1.DataPoint{Sid: 2,
Timestamp: ×tamppb.Timestamp{Seconds: 2, Nanos: 2}, Version: 2},
sortField: []byte{2}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 1},
sortField: []byte{3}},
- {DataPoint: &measurev1.DataPoint{Sid: 1,
Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2},
sortField: []byte{3}},
- {DataPoint: &measurev1.DataPoint{Sid: 3,
Timestamp: ×tamppb.Timestamp{Seconds: 3, Nanos: 3}, Version: 2},
sortField: []byte{3}},
+ makeComparableDP(1, 1, 1, 1, 1),
+ makeComparableDP(2, 1, 1, 1, 1),
+ makeComparableDP(1, 1, 1, 2, 1),
+ makeComparableDP(1, 2, 2, 2, 2),
+ makeComparableDP(1, 2, 2, 1, 2),
+ makeComparableDP(2, 2, 2, 2, 2),
+ makeComparableDP(1, 3, 3, 1, 3),
+ makeComparableDP(1, 3, 3, 2, 3),
+ makeComparableDP(3, 3, 3, 2, 3),
},
want: []*measurev1.DataPoint{
{Sid: 1, Timestamp:
×tamppb.Timestamp{Seconds: 1, Nanos: 1}, Version: 2},
@@ -135,7 +151,7 @@ func TestSortedMIterator(t *testing.T) {
got := make([]*measurev1.DataPoint, 0)
for iter.Next() {
- got = append(got, iter.Current()[0])
+ got = append(got,
iter.Current()[0].GetDataPoint())
}
slices.SortFunc(got, func(a, b *measurev1.DataPoint)
int {
@@ -158,3 +174,131 @@ func TestSortedMIterator(t *testing.T) {
})
}
}
+
+func makeInternalDP(shardID uint32, tagValue string)
*measurev1.InternalDataPoint {
+ return &measurev1.InternalDataPoint{
+ ShardId: shardID,
+ DataPoint: &measurev1.DataPoint{
+ TagFamilies: []*modelv1.TagFamily{
+ {
+ Name: "default",
+ Tags: []*modelv1.Tag{
+ {Key: "group_tag", Value:
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
tagValue}}}},
+ },
+ },
+ },
+ },
+ }
+}
+
+func makeGroupByTagsRefs() [][]*logical.TagRef {
+ return [][]*logical.TagRef{
+ {
+ {
+ Tag: &logical.Tag{},
+ Spec: &logical.TagSpec{TagFamilyIdx: 0, TagIdx:
0, Spec: &databasev1.TagSpec{Type: databasev1.TagType_TAG_TYPE_STRING}},
+ },
+ },
+ }
+}
+
+func TestDeduplicateAggregatedDataPointsWithShard(t *testing.T) {
+ testCases := []struct {
+ name string
+ dataPoints []*measurev1.InternalDataPoint
+ groupByTagsRefs [][]*logical.TagRef
+ wantShardIDs []uint32
+ wantTagValues []string
+ wantLen int
+ expectErr bool
+ }{
+ {
+ name: "empty groupByTagsRefs returns all
data points",
+ dataPoints:
[]*measurev1.InternalDataPoint{makeInternalDP(1, "a"), makeInternalDP(2, "a")},
+ groupByTagsRefs: nil,
+ wantLen: 2,
+ wantShardIDs: []uint32{1, 2},
+ wantTagValues: []string{"a", "a"},
+ },
+ {
+ name: "deduplicate replicas of same shard
with same group key",
+ dataPoints:
[]*measurev1.InternalDataPoint{makeInternalDP(1, "a"), makeInternalDP(1, "a")},
+ groupByTagsRefs: makeGroupByTagsRefs(),
+ wantLen: 1,
+ wantShardIDs: []uint32{1},
+ wantTagValues: []string{"a"},
+ },
+ {
+ name: "preserve data from different shards
with same group key",
+ dataPoints:
[]*measurev1.InternalDataPoint{makeInternalDP(1, "a"), makeInternalDP(2, "a")},
+ groupByTagsRefs: makeGroupByTagsRefs(),
+ wantLen: 2,
+ wantShardIDs: []uint32{1, 2},
+ wantTagValues: []string{"a", "a"},
+ },
+ {
+ name: "preserve data from same shard with different
group keys",
+ dataPoints: []*measurev1.InternalDataPoint{
+ makeInternalDP(1, "a"),
+ makeInternalDP(1, "b"),
+ },
+ groupByTagsRefs: makeGroupByTagsRefs(),
+ wantLen: 2,
+ wantShardIDs: []uint32{1, 1},
+ wantTagValues: []string{"a", "b"},
+ },
+ {
+ name: "complex scenario: multiple shards and group keys
with replicas",
+ dataPoints: []*measurev1.InternalDataPoint{
+ makeInternalDP(1, "a"),
+ makeInternalDP(1, "a"), // replica, should be
deduplicated
+ makeInternalDP(2, "a"), // different shard, keep
+ makeInternalDP(1, "b"), // different group key,
keep
+ makeInternalDP(2, "b"), // different shard, keep
+ makeInternalDP(2, "b"), // replica, should be
deduplicated
+ },
+ groupByTagsRefs: makeGroupByTagsRefs(),
+ wantLen: 4,
+ wantShardIDs: []uint32{1, 2, 1, 2},
+ wantTagValues: []string{"a", "a", "b", "b"},
+ },
+ {
+ name: "empty data points",
+ dataPoints: []*measurev1.InternalDataPoint{},
+ groupByTagsRefs: makeGroupByTagsRefs(),
+ wantLen: 0,
+ wantShardIDs: []uint32{},
+ wantTagValues: []string{},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ got, err :=
deduplicateAggregatedDataPointsWithShard(tc.dataPoints, tc.groupByTagsRefs)
+ if tc.expectErr {
+ if err == nil {
+ t.Errorf("expected error but got nil")
+ }
+ return
+ }
+ if err != nil {
+ t.Errorf("unexpected error: %v", err)
+ return
+ }
+ if len(got) != tc.wantLen {
+ t.Errorf("got %d data points, want %d",
len(got), tc.wantLen)
+ return
+ }
+
+ for i, dp := range got {
+ if dp.ShardId != tc.wantShardIDs[i] {
+ t.Errorf("data point %d: got shard %d,
want %d", i, dp.ShardId, tc.wantShardIDs[i])
+ }
+ tagValue :=
dp.DataPoint.GetTagFamilies()[0].GetTags()[0].GetValue().GetStr().GetValue()
+ if tagValue != tc.wantTagValues[i] {
+ t.Errorf("data point %d: got tag value
%s, want %s", i, tagValue, tc.wantTagValues[i])
+ }
+ }
+ })
+ }
+}
diff --git a/pkg/query/logical/measure/measure_plan_groupby.go
b/pkg/query/logical/measure/measure_plan_groupby.go
index 6584fd71..6a54aeee 100644
--- a/pkg/query/logical/measure/measure_plan_groupby.go
+++ b/pkg/query/logical/measure/measure_plan_groupby.go
@@ -129,24 +129,24 @@ func (g *groupBy) hash(ec context.Context) (mit
executor.MIterator, err error) {
err = multierr.Append(err, iter.Close())
}()
- groupMap := make(map[uint64][]*measurev1.DataPoint)
+ groupMap := make(map[uint64][]*measurev1.InternalDataPoint)
groupLst := make([]uint64, 0)
for iter.Next() {
dataPoints := iter.Current()
- for _, dp := range dataPoints {
- key, innerErr := formatGroupByKey(dp, g.groupByTagsRefs)
+ for _, idp := range dataPoints {
+ key, innerErr := formatGroupByKey(idp.GetDataPoint(),
g.groupByTagsRefs)
if innerErr != nil {
return nil, innerErr
}
group, ok := groupMap[key]
if !ok {
- group = make([]*measurev1.DataPoint, 0)
+ group = make([]*measurev1.InternalDataPoint, 0)
groupLst = append(groupLst, key)
}
if group == nil {
return nil, errors.New("aggregation op does not
exist")
}
- group = append(group, dp)
+ group = append(group, idp)
groupMap[key] = group
}
}
@@ -184,12 +184,12 @@ func formatGroupByKey(point *measurev1.DataPoint,
groupByTagsRefs [][]*logical.T
}
type groupIterator struct {
- groupMap map[uint64][]*measurev1.DataPoint
+ groupMap map[uint64][]*measurev1.InternalDataPoint
groupLst []uint64
index int
}
-func newGroupIterator(groupedMap map[uint64][]*measurev1.DataPoint, groupLst
[]uint64) executor.MIterator {
+func newGroupIterator(groupedMap map[uint64][]*measurev1.InternalDataPoint,
groupLst []uint64) executor.MIterator {
return &groupIterator{
groupMap: groupedMap,
groupLst: groupLst,
@@ -205,7 +205,7 @@ func (gmi *groupIterator) Next() bool {
return true
}
-func (gmi *groupIterator) Current() []*measurev1.DataPoint {
+func (gmi *groupIterator) Current() []*measurev1.InternalDataPoint {
key := gmi.groupLst[gmi.index]
return gmi.groupMap[key]
}
@@ -218,9 +218,9 @@ func (gmi *groupIterator) Close() error {
type groupSortIterator struct {
iter executor.MIterator
err error
- cdp *measurev1.DataPoint
+ cdp *measurev1.InternalDataPoint
groupByTagsRefs [][]*logical.TagRef
- current []*measurev1.DataPoint
+ current []*measurev1.InternalDataPoint
index int
key uint64
closed bool
@@ -245,12 +245,12 @@ func (gmi *groupSortIterator) Next() bool {
gmi.current = append(gmi.current, gmi.cdp)
}
for {
- dp, ok := gmi.nextDP()
+ idp, ok := gmi.nextDP()
if !ok {
gmi.closed = true
return len(gmi.current) > 0
}
- k, err := formatGroupByKey(dp, gmi.groupByTagsRefs)
+ k, err := formatGroupByKey(idp.GetDataPoint(),
gmi.groupByTagsRefs)
if err != nil {
gmi.closed = true
gmi.err = err
@@ -260,15 +260,15 @@ func (gmi *groupSortIterator) Next() bool {
gmi.key = k
}
if gmi.key != k {
- gmi.cdp = dp
+ gmi.cdp = idp
gmi.key = k
return true
}
- gmi.current = append(gmi.current, dp)
+ gmi.current = append(gmi.current, idp)
}
}
-func (gmi *groupSortIterator) Current() []*measurev1.DataPoint {
+func (gmi *groupSortIterator) Current() []*measurev1.InternalDataPoint {
return gmi.current
}
@@ -277,7 +277,7 @@ func (gmi *groupSortIterator) Close() error {
return multierr.Combine(gmi.err, gmi.iter.Close())
}
-func (gmi *groupSortIterator) nextDP() (*measurev1.DataPoint, bool) {
+func (gmi *groupSortIterator) nextDP() (*measurev1.InternalDataPoint, bool) {
if gmi.index < 0 {
if ok := gmi.iter.Next(); !ok {
return nil, false
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 9f4a2270..f82dee2f 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -24,6 +24,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
+ "github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -279,7 +280,7 @@ type resultMIterator struct {
result model.MeasureQueryResult
hiddenTags logical.HiddenTagSet
err error
- current []*measurev1.DataPoint
+ current []*measurev1.InternalDataPoint
projectionTags []model.TagProjection
projectionFields []string
i int
@@ -364,14 +365,21 @@ func (ei *resultMIterator) Next() bool {
})
}
}
- ei.current = append(ei.current, dp)
+ var shardID common.ShardID
+ if len(r.ShardIDs) > i {
+ shardID = r.ShardIDs[i]
+ }
+ ei.current = append(ei.current, &measurev1.InternalDataPoint{
+ DataPoint: dp,
+ ShardId: uint32(shardID),
+ })
}
return true
}
-func (ei *resultMIterator) Current() []*measurev1.DataPoint {
- return []*measurev1.DataPoint{ei.current[ei.i]}
+func (ei *resultMIterator) Current() []*measurev1.InternalDataPoint {
+ return []*measurev1.InternalDataPoint{ei.current[ei.i]}
}
func (ei *resultMIterator) Close() error {
diff --git a/pkg/query/logical/measure/measure_plan_top.go
b/pkg/query/logical/measure/measure_plan_top.go
index 25fc5062..559b635e 100644
--- a/pkg/query/logical/measure/measure_plan_top.go
+++ b/pkg/query/logical/measure/measure_plan_top.go
@@ -102,12 +102,12 @@ func (g *topOp) Execute(ec context.Context) (mit
executor.MIterator, err error)
g.topNStream.Purge()
for iter.Next() {
dpp := iter.Current()
- for _, dp := range dpp {
- value := dp.GetFields()[g.fieldRef.Spec.FieldIdx].
+ for _, idp := range dpp {
+ value :=
idp.GetDataPoint().GetFields()[g.fieldRef.Spec.FieldIdx].
GetValue().
GetInt().
GetValue()
- g.topNStream.Insert(NewTopElement(dp, value))
+ g.topNStream.Insert(NewTopElement(idp, value))
}
}
return newTopIterator(g.topNStream.Elements()), nil
@@ -130,8 +130,8 @@ func (ami *topIterator) Next() bool {
return ami.index < len(ami.elements)
}
-func (ami *topIterator) Current() []*measurev1.DataPoint {
- return []*measurev1.DataPoint{ami.elements[ami.index].dp}
+func (ami *topIterator) Current() []*measurev1.InternalDataPoint {
+ return []*measurev1.InternalDataPoint{ami.elements[ami.index].idp}
}
func (ami *topIterator) Close() error {
diff --git a/pkg/query/logical/measure/measure_top.go
b/pkg/query/logical/measure/measure_top.go
index 39bec105..f7614e62 100644
--- a/pkg/query/logical/measure/measure_top.go
+++ b/pkg/query/logical/measure/measure_top.go
@@ -27,14 +27,14 @@ import (
// TopElement seals a sortable value and its data point which this value
belongs to.
type TopElement struct {
- dp *measurev1.DataPoint
+ idp *measurev1.InternalDataPoint
value int64
}
// NewTopElement returns a TopElement.
-func NewTopElement(dp *measurev1.DataPoint, value int64) TopElement {
+func NewTopElement(idp *measurev1.InternalDataPoint, value int64) TopElement {
return TopElement{
- dp: dp,
+ idp: idp,
value: value,
}
}
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go
b/pkg/query/logical/measure/topn_plan_localscan.go
index be85e760..418577e4 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -177,7 +177,7 @@ func (i *localScan) Schema() logical.Schema {
type topNMIterator struct {
result model.MeasureQueryResult
err error
- current []*measurev1.DataPoint
+ current []*measurev1.InternalDataPoint
}
func (ei *topNMIterator) Next() bool {
@@ -213,6 +213,10 @@ func (ei *topNMIterator) Next() bool {
ei.err = multierr.Append(ei.err,
errors.WithMessagef(err, "failed to unmarshal topN values[%d]:[%s]%s", i, ts,
hex.EncodeToString(fv.GetBinaryData())))
continue
}
+ shardID := uint32(0)
+ if i < len(r.ShardIDs) {
+ shardID = uint32(r.ShardIDs[i])
+ }
fieldName, entityNames, values, entities := topNValue.Values()
for j := range entities {
dp := &measurev1.DataPoint{
@@ -240,13 +244,13 @@ func (ei *topNMIterator) Next() bool {
},
},
})
- ei.current = append(ei.current, dp)
+ ei.current = append(ei.current,
&measurev1.InternalDataPoint{DataPoint: dp, ShardId: shardID})
}
}
return true
}
-func (ei *topNMIterator) Current() []*measurev1.DataPoint {
+func (ei *topNMIterator) Current() []*measurev1.InternalDataPoint {
return ei.current
}
diff --git a/pkg/query/logical/measure/topn_plan_merge.go
b/pkg/query/logical/measure/topn_plan_merge.go
index 62bce81d..bb31a99e 100644
--- a/pkg/query/logical/measure/topn_plan_merge.go
+++ b/pkg/query/logical/measure/topn_plan_merge.go
@@ -97,7 +97,7 @@ func (t *topNMerger) Schema() logical.Schema {
type topNMergingIterator struct {
iters []executor.MIterator
- currentDps []*measurev1.DataPoint
+ currentDps []*measurev1.InternalDataPoint
currentIt int
}
@@ -125,11 +125,11 @@ func (t *topNMergingIterator) Next() bool {
return false
}
-func (t *topNMergingIterator) Current() []*measurev1.DataPoint {
+func (t *topNMergingIterator) Current() []*measurev1.InternalDataPoint {
if len(t.currentDps) == 0 {
return nil
}
- return []*measurev1.DataPoint{t.currentDps[0]}
+ return []*measurev1.InternalDataPoint{t.currentDps[0]}
}
func (t *topNMergingIterator) Close() error {
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 4aa313ea..246d8c29 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -73,6 +73,7 @@ type MeasureResult struct {
Error error
Timestamps []int64
Versions []int64
+ ShardIDs []common.ShardID
TagFamilies []TagFamily
Fields []Field
SID common.SeriesID