This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cee6c26 Distributed query (#326)
5cee6c26 is described below

commit 5cee6c2600153cf4d71fde15a169f8467bc5e002
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Sep 11 16:45:02 2023 +0800

    Distributed query (#326)
---
 CHANGES.md                                         |   1 +
 api/proto/banyandb/cluster/v1/rpc.proto            |  18 +-
 api/proto/banyandb/database/v1/database.proto      |   3 +-
 banyand/dquery/dquery.go                           |  87 +++
 banyand/dquery/measure.go                          | 100 ++++
 banyand/dquery/stream.go                           |  84 +++
 banyand/dquery/topn.go                             | 122 ++++
 banyand/internal/cmd/liaison.go                    |   6 +
 banyand/measure/measure.go                         |  38 +-
 banyand/measure/measure_write.go                   |   2 +-
 banyand/measure/metadata.go                        |  63 ++-
 banyand/measure/metadata_test.go                   |  10 +-
 banyand/measure/service.go                         |   6 +-
 banyand/query/processor.go                         |  15 +-
 banyand/queue/local.go                             |   8 +
 banyand/queue/pub/pub.go                           | 147 ++++-
 banyand/queue/queue.go                             |   1 +
 banyand/queue/sub/server.go                        |   2 +-
 banyand/queue/sub/sub.go                           |  56 +-
 banyand/stream/metadata.go                         |  65 ++-
 banyand/stream/metadata_test.go                    |  10 +-
 banyand/stream/service.go                          |  12 +-
 banyand/stream/stream.go                           |  34 +-
 docs/api-reference.md                              | 627 +++++++++++----------
 pkg/bus/bus.go                                     |   5 +
 pkg/iter/sort/sort.go                              | 133 +++++
 pkg/iter/sort/sort_test.go                         | 102 ++++
 pkg/query/executor/interface.go                    |  58 +-
 pkg/query/logical/common.go                        |  32 +-
 pkg/query/logical/index_filter.go                  |  14 +-
 pkg/query/logical/iter.go                          | 134 -----
 pkg/query/logical/measure/measure_analyzer.go      |  67 ++-
 pkg/query/logical/measure/measure_plan.go          |   3 +-
 .../logical/measure/measure_plan_aggregation.go    |   3 +-
 .../logical/measure/measure_plan_distributed.go    | 234 ++++++++
 pkg/query/logical/measure/measure_plan_groupby.go  |   7 +-
 .../measure/measure_plan_indexscan_local.go        |  18 +-
 pkg/query/logical/measure/measure_plan_top.go      |   3 +-
 pkg/query/logical/stream/stream_analyzer.go        |  28 +-
 .../logical/stream/stream_plan_distributed.go      | 214 +++++++
 .../logical/stream/stream_plan_indexscan_global.go |  11 +-
 .../logical/stream/stream_plan_indexscan_local.go  |  11 +-
 pkg/query/logical/stream/stream_plan_tag_filter.go |   5 +-
 pkg/run/channel_closer.go                          |   2 +-
 pkg/schema/metadata.go                             | 364 ++++++------
 pkg/schema/schema.go                               |  96 ++++
 46 files changed, 2237 insertions(+), 824 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 54f68ee0..0d98483c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 - Register the node role to the metadata registry.
 - Implement the remote queue to spreading data to data nodes.
 - Fix parse environment variables error
+- Implement the distributed query engine.
 
 ### Bugs
 
diff --git a/api/proto/banyandb/cluster/v1/rpc.proto 
b/api/proto/banyandb/cluster/v1/rpc.proto
index 076c21e6..732bbe1f 100644
--- a/api/proto/banyandb/cluster/v1/rpc.proto
+++ b/api/proto/banyandb/cluster/v1/rpc.proto
@@ -19,22 +19,22 @@ syntax = "proto3";
 
 package banyandb.cluster.v1;
 
-import "banyandb/measure/v1/write.proto";
-import "banyandb/stream/v1/write.proto";
+import "google/protobuf/any.proto";
 
 option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1";
 
-message WriteRequest {
+message SendRequest {
   string topic = 1;
   uint64 message_id = 2;
-  oneof request {
-    stream.v1.InternalWriteRequest stream = 3;
-    measure.v1.InternalWriteRequest measure = 4;
-  }
+  google.protobuf.Any body = 3;
 }
 
-message WriteResponse {}
+message SendResponse {
+  uint64 message_id = 1;
+  string error = 2;
+  google.protobuf.Any body = 3;
+}
 
 service Service {
-  rpc Write(stream WriteRequest) returns (stream WriteResponse);
+  rpc Send(stream SendRequest) returns (stream SendResponse);
 }
diff --git a/api/proto/banyandb/database/v1/database.proto 
b/api/proto/banyandb/database/v1/database.proto
index 23309d0a..8ae7ffc3 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -29,8 +29,7 @@ enum Role {
   ROLE_UNSPECIFIED = 0;
   ROLE_META = 1;
   ROLE_DATA = 2;
-  ROLE_QUERY = 3;
-  ROLE_LIAISON = 4;
+  ROLE_LIAISON = 3;
 }
 
 message Node {
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
new file mode 100644
index 00000000..61f490c8
--- /dev/null
+++ b/banyand/dquery/dquery.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dquery implement the distributed query.
+package dquery
+
+import (
+       "context"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       moduleName = "distributed-query"
+)
+
+type queryService struct {
+       log         *logger.Logger
+       metaService metadata.Repo
+       sqp         *streamQueryProcessor
+       mqp         *measureQueryProcessor
+       tqp         *topNQueryProcessor
+}
+
+// NewService return a new query service.
+func NewService(metaService metadata.Repo, broadcaster bus.Broadcaster,
+) (run.Unit, error) {
+       svc := &queryService{
+               metaService: metaService,
+       }
+       svc.sqp = &streamQueryProcessor{
+               queryService: svc,
+               broadcaster:  broadcaster,
+       }
+       svc.mqp = &measureQueryProcessor{
+               queryService: svc,
+               broadcaster:  broadcaster,
+       }
+       svc.tqp = &topNQueryProcessor{
+               queryService: svc,
+               broadcaster:  broadcaster,
+       }
+       return svc, nil
+}
+
+func (q *queryService) Name() string {
+       return moduleName
+}
+
+func (q *queryService) PreRun(_ context.Context) error {
+       q.log = logger.GetLogger(moduleName)
+       q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
+       q.mqp.measureService = measure.NewPortableRepository(q.metaService, 
q.log)
+       return nil
+}
+
+var _ executor.DistributedExecutionContext = (*distributedContext)(nil)
+
+type distributedContext struct {
+       bus.Broadcaster
+       timeRange *modelv1.TimeRange
+}
+
+func (dc *distributedContext) TimeRange() *modelv1.TimeRange {
+       return dc.timeRange
+}
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
new file mode 100644
index 00000000..522a9cda
--- /dev/null
+++ b/banyand/dquery/measure.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dquery
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
+)
+
+type measureQueryProcessor struct {
+       measureService measure.Query
+       broadcaster    bus.Broadcaster
+       *queryService
+}
+
+func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
+       queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
+       now := time.Now().UnixNano()
+       if !ok {
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
+               return
+       }
+       ml := p.log.Named("measure", queryCriteria.Metadata.Group, 
queryCriteria.Metadata.Name)
+       if e := ml.Debug(); e.Enabled() {
+               e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a 
query event")
+       }
+
+       meta := queryCriteria.GetMetadata()
+       ec, err := p.measureService.Measure(meta)
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to get execution context for measure %s: %v", meta.GetName(), err))
+               return
+       }
+
+       s, err := logical_measure.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for measure %s: %v", meta.GetName(), err))
+               return
+       }
+
+       plan, err := logical_measure.DistributedAnalyze(queryCriteria, s)
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for measure %s: %v", meta.GetName(), err))
+               return
+       }
+
+       if e := ml.Debug(); e.Enabled() {
+               e.Str("plan", plan.String()).Msg("query plan")
+       }
+
+       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(),
 &distributedContext{
+               Broadcaster: p.broadcaster,
+               timeRange:   queryCriteria.TimeRange,
+       }))
+       if err != nil {
+               ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", meta.GetName(), err))
+               return
+       }
+       defer func() {
+               if err = mIterator.Close(); err != nil {
+                       ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+               }
+       }()
+       result := make([]*measurev1.DataPoint, 0)
+       for mIterator.Next() {
+               current := mIterator.Current()
+               if len(current) > 0 {
+                       result = append(result, current[0])
+               }
+       }
+       if e := ml.Debug(); e.Enabled() {
+               e.RawJSON("ret", 
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
+       }
+       resp = bus.NewMessage(bus.MessageID(now), result)
+       return
+}
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
new file mode 100644
index 00000000..3a92aaeb
--- /dev/null
+++ b/banyand/dquery/stream.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dquery
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
+)
+
+type streamQueryProcessor struct {
+       streamService stream.Query
+       broadcaster   bus.Broadcaster
+       *queryService
+}
+
+func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
+       now := time.Now().UnixNano()
+       queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
+       if !ok {
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
+               return
+       }
+       if p.log.Debug().Enabled() {
+               p.log.Debug().RawJSON("criteria", 
logger.Proto(queryCriteria)).Msg("received a query request")
+       }
+
+       meta := queryCriteria.GetMetadata()
+       ec, err := p.streamService.Stream(meta)
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to get execution context for stream %s: %v", meta.GetName(), err))
+               return
+       }
+       s, err := logical_stream.BuildSchema(ec.GetSchema(), ec.GetIndexRules())
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for stream %s: %v", meta.GetName(), err))
+               return
+       }
+
+       plan, err := logical_stream.DistributedAnalyze(queryCriteria, s)
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for stream %s: %v", meta.GetName(), err))
+               return
+       }
+
+       if p.log.Debug().Enabled() {
+               p.log.Debug().Str("plan", plan.String()).Msg("query plan")
+       }
+       entities, err := 
plan.(executor.StreamExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(),
 &distributedContext{
+               Broadcaster: p.broadcaster,
+               timeRange:   queryCriteria.TimeRange,
+       }))
+       if err != nil {
+               p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("execute the query plan for stream %s: %v", meta.GetName(), 
err))
+               return
+       }
+
+       resp = bus.NewMessage(bus.MessageID(now), entities)
+
+       return
+}
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
new file mode 100644
index 00000000..df34d337
--- /dev/null
+++ b/banyand/dquery/topn.go
@@ -0,0 +1,122 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dquery
+
+import (
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+)
+
+type topNQueryProcessor struct {
+       broadcaster bus.Broadcaster
+       *queryService
+}
+
+func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
+       request, ok := message.Data().(*measurev1.TopNRequest)
+       if !ok {
+               t.log.Warn().Msg("invalid event data type")
+               return
+       }
+       if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+               t.log.Warn().Msg("invalid requested sort direction")
+               return
+       }
+       if e := t.log.Debug(); e.Enabled() {
+               e.Stringer("req", request).Msg("received a topN query event")
+       }
+       now := bus.MessageID(request.TimeRange.Begin.Nanos)
+       ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, 
bus.NewMessage(now, request))
+       if err != nil {
+               resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.Metadata.GetName(), err))
+               return
+       }
+       var allErr error
+       var sii []sort.Iterator[*comparableTopNItem]
+       var latestTimestamp *timestamppb.Timestamp
+       for _, f := range ff {
+               if m, getErr := f.Get(); getErr != nil {
+                       allErr = multierr.Append(allErr, getErr)
+               } else {
+                       tl := m.Data().(*measurev1.TopNList)
+                       un := tl.Timestamp.AsTime().UnixNano()
+                       if un > latestTimestamp.AsTime().UnixNano() {
+                               latestTimestamp = tl.Timestamp
+                       }
+                       sii = append(sii, &sortedTopNList{TopNList: tl})
+               }
+       }
+       var desc bool
+       if request.GetFieldValueSort() == modelv1.Sort_SORT_DESC {
+               desc = true
+       }
+       iter := sort.NewItemIter[*comparableTopNItem](sii, desc)
+       defer func() {
+               _ = iter.Close()
+       }()
+       var items []*measurev1.TopNList_Item
+       for iter.Next() && len(items) < int(request.TopN) {
+               items = append(items, iter.Val().TopNList_Item)
+       }
+       resp = bus.NewMessage(now, &measurev1.TopNList{
+               Items:     items,
+               Timestamp: latestTimestamp,
+       })
+       return
+}
+
+var _ sort.Comparable = (*comparableTopNItem)(nil)
+
+type comparableTopNItem struct {
+       *measurev1.TopNList_Item
+}
+
+func (c *comparableTopNItem) SortedField() []byte {
+       return convert.Int64ToBytes(c.Value.GetInt().Value)
+}
+
+var _ sort.Iterator[*comparableTopNItem] = (*sortedTopNList)(nil)
+
+type sortedTopNList struct {
+       *measurev1.TopNList
+       index int
+}
+
+func (*sortedTopNList) Close() error {
+       return nil
+}
+
+func (s *sortedTopNList) Next() bool {
+       if s.index >= len(s.Items) {
+               return false
+       }
+       s.index++
+       return s.index < len(s.Items)
+}
+
+func (s *sortedTopNList) Val() *comparableTopNItem {
+       return &comparableTopNItem{s.Items[s.index-1]}
+}
diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go
index 0b842201..d09428f0 100644
--- a/banyand/internal/cmd/liaison.go
+++ b/banyand/internal/cmd/liaison.go
@@ -24,6 +24,7 @@ import (
        "github.com/spf13/cobra"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/dquery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -49,10 +50,15 @@ func newLiaisonCmd() *cobra.Command {
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
+       dQuery, err := dquery.NewService(metaSvc, pipeline)
+       if err != nil {
+               l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
+       }
 
        units := []run.Unit{
                new(signal.Handler),
                pipeline,
+               dQuery,
                grpcServer,
                httpServer,
                profSvc,
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 9fbfe653..b8a55140 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -22,7 +22,6 @@ package measure
 
 import (
        "context"
-       "math"
        "time"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -32,7 +31,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
-       resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -43,18 +41,17 @@ const (
 )
 
 type measure struct {
-       databaseSupplier       tsdb.Supplier
-       l                      *logger.Logger
-       schema                 *databasev1.Measure
-       indexWriter            *index.Writer
-       processorManager       *topNProcessorManager
-       name                   string
-       group                  string
-       indexRules             []*databasev1.IndexRule
-       topNAggregations       []*databasev1.TopNAggregation
-       maxObservedModRevision int64
-       interval               time.Duration
-       shardNum               uint32
+       databaseSupplier tsdb.Supplier
+       l                *logger.Logger
+       schema           *databasev1.Measure
+       indexWriter      *index.Writer
+       processorManager *topNProcessorManager
+       name             string
+       group            string
+       indexRules       []*databasev1.IndexRule
+       topNAggregations []*databasev1.TopNAggregation
+       interval         time.Duration
+       shardNum         uint32
 }
 
 func (s *measure) startSteamingManager(pipeline queue.Queue) error {
@@ -80,22 +77,10 @@ func (s *measure) GetSchema() *databasev1.Measure {
        return s.schema
 }
 
-func (s *measure) GetMetadata() *commonv1.Metadata {
-       return s.schema.Metadata
-}
-
 func (s *measure) GetIndexRules() []*databasev1.IndexRule {
        return s.indexRules
 }
 
-func (s *measure) GetTopN() []*databasev1.TopNAggregation {
-       return s.topNAggregations
-}
-
-func (s *measure) MaxObservedModRevision() int64 {
-       return s.maxObservedModRevision
-}
-
 func (s *measure) Close() error {
        if s.processorManager == nil {
                return nil
@@ -105,7 +90,6 @@ func (s *measure) Close() error {
 
 func (s *measure) parseSpec() (err error) {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
-       s.maxObservedModRevision = 
int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), 
float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
        if s.schema.Interval != "" {
                s.interval, err = timestamp.ParseDuration(s.schema.Interval)
        }
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 2b5f561a..3235b5c6 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -134,7 +134,7 @@ func (s *measure) write(shardID common.ShardID, entity 
[]byte, entityValues tsdb
        if s.processorManager != nil {
                
s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
                        Request: &measurev1.WriteRequest{
-                               Metadata:  s.GetMetadata(),
+                               Metadata:  s.GetSchema().Metadata,
                                DataPoint: value,
                        },
                        EntityValues: entityValues[1:],
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 9df34dfa..1185442b 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -20,6 +20,7 @@ package measure
 import (
        "context"
        "fmt"
+       "io"
        "path"
        "time"
 
@@ -50,7 +51,7 @@ type schemaRepo struct {
 func newSchemaRepo(path string, metadata metadata.Repo,
        dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue, 
encoderBufferSize, bufferSize int64,
 ) schemaRepo {
-       return schemaRepo{
+       sr := schemaRepo{
                l:        l,
                metadata: metadata,
                Repository: resourceSchema.NewRepository(
@@ -59,6 +60,38 @@ func newSchemaRepo(path string, metadata metadata.Repo,
                        newSupplier(path, metadata, dbOpts, l, pipeline, 
encoderBufferSize, bufferSize),
                ),
        }
+       sr.start()
+       return sr
+}
+
+// NewPortableRepository creates a new portable repository.
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query {
+       r := &schemaRepo{
+               l:        l,
+               metadata: metadata,
+               Repository: resourceSchema.NewPortableRepository(
+                       metadata,
+                       l,
+                       newPortableSupplier(metadata, l),
+               ),
+       }
+       r.start()
+       return r
+}
+
+func (sr *schemaRepo) start() {
+       sr.Watcher()
+       sr.metadata.
+               RegisterHandler("measure", 
schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
+                       sr)
+}
+
+func (sr *schemaRepo) Measure(metadata *commonv1.Metadata) (Measure, error) {
+       sm, ok := sr.loadMeasure(metadata)
+       if !ok {
+               return nil, errors.WithStack(ErrMeasureNotExist)
+       }
+       return sm, nil
 }
 
 func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
@@ -263,7 +296,7 @@ func (sr *schemaRepo) loadMeasure(metadata 
*commonv1.Metadata) (*measure, bool)
        if !ok {
                return nil, false
        }
-       s, ok := r.(*measure)
+       s, ok := r.Delegated().(*measure)
        return s, ok
 }
 
@@ -292,12 +325,12 @@ func newSupplier(path string, metadata metadata.Repo, 
dbOpts tsdb.DatabaseOpts,
        }
 }
 
-func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec 
resourceSchema.ResourceSpec) (resourceSchema.Resource, error) {
-       measureSchema := spec.Schema.(*databasev1.Measure)
+func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec 
resourceSchema.Resource) (io.Closer, error) {
+       measureSchema := spec.Schema().(*databasev1.Measure)
        return openMeasure(shardNum, db, measureSpec{
                schema:           measureSchema,
-               indexRules:       spec.IndexRules,
-               topNAggregations: spec.Aggregations,
+               indexRules:       spec.IndexRules(),
+               topNAggregations: spec.TopN(),
        }, s.l, s.pipeline)
 }
 
@@ -349,3 +382,21 @@ func intervalFn(key []byte) time.Duration {
        }
        return interval
 }
+
+type portableSupplier struct {
+       metadata metadata.Repo
+       l        *logger.Logger
+}
+
+func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) 
*portableSupplier {
+       return &portableSupplier{
+               metadata: metadata,
+               l:        l,
+       }
+}
+
+func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
+}
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index 9504c1f9..f6cda0f1 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -37,6 +37,10 @@ var _ = Describe("Metadata", func() {
        BeforeEach(func() {
                svcs, deferFn = setUp()
                goods = gleak.Goroutines()
+               Eventually(func() bool {
+                       _, ok := svcs.measure.LoadGroup("sw_metric")
+                       return ok
+               }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
        })
 
        AfterEach(func() {
@@ -45,12 +49,6 @@ var _ = Describe("Metadata", func() {
        })
 
        Context("Manage group", func() {
-               It("should pass smoke test", func() {
-                       Eventually(func() bool {
-                               _, ok := svcs.measure.LoadGroup("sw_metric")
-                               return ok
-                       }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
-               })
                It("should close the group", func() {
                        deleted, err := 
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "sw_metric")
                        Expect(err).ShouldNot(HaveOccurred())
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 47c93684..43940d8f 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -27,7 +27,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -114,10 +113,7 @@ func (s *service) PreRun(_ context.Context) error {
        s.schemaRepo = newSchemaRepo(path, s.metadata, s.dbOpts,
                s.l, s.localPipeline, int64(s.BlockEncoderBufferSize), 
int64(s.BlockBufferSize))
        // run a serial watcher
-       go s.schemaRepo.Watcher()
-       s.metadata.
-               RegisterHandler("measure", 
schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
-                       &s.schemaRepo)
+
        s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
        err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
        if err != nil {
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 67df4b56..a47e6df8 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -25,7 +25,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
@@ -83,8 +82,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to get execution context for stream %s: %v", meta.GetName(), err))
                return
        }
-
-       s, err := logical_stream.BuildSchema(ec)
+       s, err := logical_stream.BuildSchema(ec.GetSchema(), ec.GetIndexRules())
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for stream %s: %v", meta.GetName(), err))
                return
@@ -99,8 +97,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        if p.log.Debug().Enabled() {
                p.log.Debug().Str("plan", plan.String()).Msg("query plan")
        }
-
-       entities, err := plan.(executor.StreamExecutable).Execute(ec)
+       entities, err := 
plan.(executor.StreamExecutable).Execute(executor.WithStreamExecutionContext(context.Background(),
 ec))
        if err != nil {
                p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("execute the query plan for stream %s: %v", meta.GetName(), 
err))
@@ -136,7 +133,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                return
        }
 
-       s, err := logical_measure.BuildSchema(ec)
+       s, err := logical_measure.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for measure %s: %v", meta.GetName(), err))
                return
@@ -152,7 +149,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                e.Str("plan", plan.String()).Msg("query plan")
        }
 
-       mIterator, err := plan.(executor.MeasureExecutable).Execute(ec)
+       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(),
 ec))
        if err != nil {
                ml.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", meta.GetName(), err))
@@ -181,10 +178,6 @@ func (q *queryService) Name() string {
        return moduleName
 }
 
-func (q *queryService) Role() databasev1.Role {
-       return databasev1.Role_ROLE_QUERY
-}
-
 func (q *queryService) PreRun(_ context.Context) error {
        q.log = logger.GetLogger(moduleName)
        return multierr.Combine(
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 659e908e..09ca07aa 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -62,6 +62,14 @@ func (l *local) Publish(topic bus.Topic, message 
...bus.Message) (bus.Future, er
        return l.local.Publish(topic, message...)
 }
 
+func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, 
error) {
+       f, err := l.Publish(topic, message)
+       if err != nil {
+               return nil, err
+       }
+       return []bus.Future{f}, nil
+}
+
 func (l local) Name() string {
        return "local-pipeline"
 }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 1f3e3cf4..b72e9a07 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -20,15 +20,17 @@ package pub
 
 import (
        "context"
+       "errors"
        "fmt"
+       "io"
        "sync"
        "time"
 
        "go.uber.org/multierr"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/anypb"
 
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
-       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -66,8 +68,56 @@ func (p *pub) Serve() run.StopNotify {
        return p.closer.CloseNotify()
 }
 
-func (*pub) Publish(_ bus.Topic, _ ...bus.Message) (bus.Future, error) {
-       panic("unimplemented")
+func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, 
error) {
+       var names []string
+       p.mu.RLock()
+       for k := range p.clients {
+               names = append(names, k)
+       }
+       p.mu.RUnlock()
+       var futures []bus.Future
+       for _, n := range names {
+               f, err := p.Publish(topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
+               if err != nil {
+                       return nil, err
+               }
+               futures = append(futures, f)
+       }
+       return futures, nil
+}
+
+func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, 
error) {
+       var err error
+       f := &future{}
+       handleMessage := func(m bus.Message, err error) error {
+               r, errSend := messageToRequest(topic, m)
+               if errSend != nil {
+                       return multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, errSend))
+               }
+               node := m.Node()
+               p.mu.Lock()
+               client, ok := p.clients[node]
+               p.mu.Unlock()
+               if !ok {
+                       return multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
+               }
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
+               stream, errCreateStream := client.client.Send(ctx)
+               if err != nil {
+                       return multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
+               }
+               errSend = stream.Send(r)
+               if errSend != nil {
+                       return multierr.Append(err, fmt.Errorf("failed to send 
message to node %s: %w", node, errSend))
+               }
+               f.clients = append(f.clients, stream)
+               return err
+       }
+       for _, m := range messages {
+               err = handleMessage(m, err)
+       }
+       return f, err
 }
 
 // NewBatchPublisher returns a new batch publisher.
@@ -96,7 +146,7 @@ func (p *pub) PreRun(context.Context) error {
 }
 
 type writeStream struct {
-       client clusterv1.Service_WriteClient
+       client clusterv1.Service_SendClient
        cancel func()
 }
 
@@ -113,27 +163,14 @@ func (bp *batchPublisher) Close() (err error) {
        return err
 }
 
-func (bp *batchPublisher) Publish(topic bus.Topic, message ...bus.Message) 
(bus.Future, error) {
-       var err error
-       for _, m := range message {
-               if !m.IsRemote() {
-                       continue
-               }
-               r := &clusterv1.WriteRequest{
-                       Topic:     topic.String(),
-                       MessageId: uint64(m.ID()),
-               }
-               switch d := m.Data().(type) {
-               case *streamv1.InternalWriteRequest:
-                       r.Request = &clusterv1.WriteRequest_Stream{Stream: d}
-               case *measurev1.InternalWriteRequest:
-                       r.Request = &clusterv1.WriteRequest_Measure{Measure: d}
-               default:
-                       err = multierr.Append(err, fmt.Errorf("invalid message 
type %T", d))
+func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) 
(bus.Future, error) {
+       for _, m := range messages {
+               r, err := messageToRequest(topic, m)
+               if err != nil {
+                       err = multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, err))
                        continue
                }
                node := m.Node()
-
                sendData := func() (success bool) {
                        if stream, ok := bp.streams[node]; !ok {
                                defer func() {
@@ -149,7 +186,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, message 
...bus.Message) (bus.
                                }
                                errSend := stream.client.Send(r)
                                if errSend != nil {
-                                       err = multierr.Append(errSend, 
fmt.Errorf("failed to send message to node %s: %w", node, err))
+                                       err = multierr.Append(err, 
fmt.Errorf("failed to send message to node %s: %w", node, errSend))
                                        return false
                                }
                                _, errRecv := stream.client.Recv()
@@ -170,7 +207,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, message 
...bus.Message) (bus.
                }
                //nolint: govet
                ctx, cancel := context.WithTimeout(context.Background(), 
30*time.Second)
-               stream, errCreateStream := client.client.Write(ctx)
+               stream, errCreateStream := client.client.Send(ctx)
                if err != nil {
                        err = multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
                        continue
@@ -182,5 +219,63 @@ func (bp *batchPublisher) Publish(topic bus.Topic, message 
...bus.Message) (bus.
                _ = sendData()
        }
        //nolint: govet
-       return nil, err
+       return nil, nil
+}
+
+func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, 
error) {
+       if !m.IsRemote() {
+               return nil, fmt.Errorf("message %d is not remote", m.ID())
+       }
+       r := &clusterv1.SendRequest{
+               Topic:     topic.String(),
+               MessageId: uint64(m.ID()),
+       }
+       message, ok := m.Data().(proto.Message)
+       if !ok {
+               return nil, fmt.Errorf("invalid message type %T", m)
+       }
+       anyMessage, err := anypb.New(message)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal message %T: %w", m, 
err)
+       }
+       r.Body = anyMessage
+       return r, nil
+}
+
+type future struct {
+       clients []clusterv1.Service_SendClient
+}
+
+func (l *future) Get() (bus.Message, error) {
+       if len(l.clients) < 1 {
+               return bus.Message{}, io.EOF
+       }
+       c := l.clients[0]
+       defer func() {
+               l.clients = l.clients[1:]
+       }()
+       resp, err := c.Recv()
+       if err != nil {
+               return bus.Message{}, err
+       }
+       return bus.NewMessage(
+               bus.MessageID(resp.MessageId),
+               resp.Body,
+       ), nil
+}
+
+func (l *future) GetAll() ([]bus.Message, error) {
+       var globalErr error
+       ret := make([]bus.Message, 0, len(l.clients))
+       for {
+               m, err := l.Get()
+               if errors.Is(err, io.EOF) {
+                       return ret, globalErr
+               }
+               if err != nil {
+                       globalErr = multierr.Append(globalErr, err)
+                       continue
+               }
+               ret = append(ret, m)
+       }
 }
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index d899162e..bada6711 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -37,6 +37,7 @@ type Queue interface {
 type Client interface {
        run.Unit
        bus.Publisher
+       bus.Broadcaster
        NewBatchPublisher() BatchPublisher
 }
 
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index d7e415da..78833713 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -59,7 +59,7 @@ type server struct {
        creds     credentials.TransportCredentials
        log       *logger.Logger
        ser       *grpclib.Server
-       listeners map[bus.Topic][]bus.MessageListener
+       listeners map[bus.Topic]bus.MessageListener
        *clusterv1.UnimplementedServiceServer
        addr           string
        certFile       string
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index d53d4fe1..c689eef0 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -22,15 +22,20 @@ import (
        "io"
 
        "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/anypb"
 
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
-// Write implements v1.ServiceServer.
-func (s *server) Write(stream clusterv1.Service_WriteServer) error {
-       reply := func(stream clusterv1.Service_WriteServer) {
-               if errResp := stream.Send(&clusterv1.WriteResponse{}); errResp 
!= nil {
+func (s *server) Send(stream clusterv1.Service_SendServer) error {
+       reply := func(writeEntity *clusterv1.SendRequest, err error, message 
string) {
+               s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg(message)
+               if errResp := stream.Send(&clusterv1.SendResponse{
+                       MessageId: writeEntity.MessageId,
+                       Error:     message,
+               }); errResp != nil {
                        s.log.Err(errResp).Msg("failed to send response")
                }
        }
@@ -47,8 +52,7 @@ func (s *server) Write(stream clusterv1.Service_WriteServer) 
error {
                        return nil
                }
                if err != nil {
-                       s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to receive message")
-                       reply(stream)
+                       reply(writeEntity, err, "failed to receive message")
                        continue
                }
                if writeEntity.Topic != "" && topic == nil {
@@ -56,14 +60,36 @@ func (s *server) Write(stream 
clusterv1.Service_WriteServer) error {
                        topic = &t
                }
                if topic == nil {
-                       s.log.Error().Stringer("written", 
writeEntity).Msg("topic is empty")
-                       reply(stream)
+                       reply(writeEntity, err, "topic is empty")
                        continue
                }
-               for _, listener := range s.getListeners(*topic) {
-                       _ = 
listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), 
writeEntity.Request))
+               listener := s.getListeners(*topic)
+               if listener == nil {
+                       reply(writeEntity, err, "no listener found")
+                       continue
+               }
+               m := 
listener.Rev(bus.NewMessage(bus.MessageID(writeEntity.MessageId), 
writeEntity.Body))
+               if m.Data() == nil {
+                       reply(writeEntity, err, "no response")
+                       continue
+               }
+               message, ok := m.Data().(proto.Message)
+               if !ok {
+                       reply(writeEntity, err, "invalid response")
+                       continue
+               }
+               anyMessage, err := anypb.New(message)
+               if err != nil {
+                       reply(writeEntity, err, "failed to marshal message")
+                       continue
+               }
+               if err := stream.Send(&clusterv1.SendResponse{
+                       MessageId: writeEntity.MessageId,
+                       Body:      anyMessage,
+               }); err != nil {
+                       reply(writeEntity, err, "failed to send response")
+                       continue
                }
-               reply(stream)
        }
 }
 
@@ -71,13 +97,13 @@ func (s *server) Subscribe(topic bus.Topic, listener 
bus.MessageListener) error
        s.listenersLock.Lock()
        defer s.listenersLock.Unlock()
        if _, ok := s.listeners[topic]; !ok {
-               s.listeners[topic] = make([]bus.MessageListener, 0)
+               s.listeners[topic] = listener
+               return nil
        }
-       s.listeners[topic] = append(s.listeners[topic], listener)
-       return nil
+       return errors.New("topic already exists")
 }
 
-func (s *server) getListeners(topic bus.Topic) []bus.MessageListener {
+func (s *server) getListeners(topic bus.Topic) bus.MessageListener {
        s.listenersLock.RLock()
        defer s.listenersLock.RUnlock()
        return s.listeners[topic]
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index e349872e..7a9b34df 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -19,9 +19,12 @@ package stream
 
 import (
        "context"
+       "io"
        "path"
        "time"
 
+       "github.com/pkg/errors"
+
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -33,6 +36,8 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
+var _ Query = (*schemaRepo)(nil)
+
 type schemaRepo struct {
        resourceSchema.Repository
        l        *logger.Logger
@@ -42,7 +47,7 @@ type schemaRepo struct {
 func newSchemaRepo(path string, metadata metadata.Repo,
        bufferSize int64, dbOpts tsdb.DatabaseOpts, l *logger.Logger,
 ) schemaRepo {
-       return schemaRepo{
+       r := schemaRepo{
                l:        l,
                metadata: metadata,
                Repository: resourceSchema.NewRepository(
@@ -51,6 +56,38 @@ func newSchemaRepo(path string, metadata metadata.Repo,
                        newSupplier(path, metadata, bufferSize, dbOpts, l),
                ),
        }
+       r.start()
+       return r
+}
+
+// NewPortableRepository creates a new portable repository.
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query {
+       r := &schemaRepo{
+               l:        l,
+               metadata: metadata,
+               Repository: resourceSchema.NewPortableRepository(
+                       metadata,
+                       l,
+                       newPortableSupplier(metadata, l),
+               ),
+       }
+       r.start()
+       return r
+}
+
+func (sr *schemaRepo) start() {
+       sr.Watcher()
+       sr.metadata.RegisterHandler("stream",
+               
schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
+               sr)
+}
+
+func (sr *schemaRepo) Stream(metadata *commonv1.Metadata) (Stream, error) {
+       sm, ok := sr.loadStream(metadata)
+       if !ok {
+               return nil, errors.WithStack(errStreamNotExist)
+       }
+       return sm, nil
 }
 
 func (sr *schemaRepo) OnAddOrUpdate(m schema.Metadata) {
@@ -156,7 +193,7 @@ func (sr *schemaRepo) loadStream(metadata 
*commonv1.Metadata) (*stream, bool) {
        if !ok {
                return nil, false
        }
-       s, ok := r.(*stream)
+       s, ok := r.Delegated().(*stream)
        return s, ok
 }
 
@@ -180,11 +217,11 @@ func newSupplier(path string, metadata metadata.Repo, 
bufferSize int64, dbOpts t
        }
 }
 
-func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec 
resourceSchema.ResourceSpec) (resourceSchema.Resource, error) {
-       streamSchema := spec.Schema.(*databasev1.Stream)
+func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, resource 
resourceSchema.Resource) (io.Closer, error) {
+       streamSchema := resource.Schema().(*databasev1.Stream)
        return openStream(shardNum, db, streamSpec{
                schema:     streamSchema,
-               indexRules: spec.IndexRules,
+               indexRules: resource.IndexRules(),
        }, s.l), nil
 }
 
@@ -222,3 +259,21 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(tsdb.Database, error) {
                }),
                opts)
 }
+
+type portableSupplier struct {
+       metadata metadata.Repo
+       l        *logger.Logger
+}
+
+func newPortableSupplier(metadata metadata.Repo, l *logger.Logger) 
*portableSupplier {
+       return &portableSupplier{
+               metadata: metadata,
+               l:        l,
+       }
+}
+
+func (s *portableSupplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       return s.metadata.StreamRegistry().GetStream(ctx, md)
+}
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index dc184d40..8c4f3bfb 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -37,6 +37,10 @@ var _ = Describe("Metadata", func() {
        BeforeEach(func() {
                goods = gleak.Goroutines()
                svcs, deferFn = setUp()
+               Eventually(func() bool {
+                       _, ok := svcs.stream.schemaRepo.LoadGroup("default")
+                       return ok
+               }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
        })
 
        AfterEach(func() {
@@ -45,12 +49,6 @@ var _ = Describe("Metadata", func() {
        })
 
        Context("Manage group", func() {
-               It("should pass smoke test", func() {
-                       Eventually(func() bool {
-                               _, ok := 
svcs.stream.schemaRepo.LoadGroup("default")
-                               return ok
-                       }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
-               })
                It("should close the group", func() {
                        deleted, err := 
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 3af678d0..4d888cfb 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -27,7 +27,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -62,11 +61,7 @@ type service struct {
 }
 
 func (s *service) Stream(metadata *commonv1.Metadata) (Stream, error) {
-       sm, ok := s.schemaRepo.loadStream(metadata)
-       if !ok {
-               return nil, errors.WithStack(errStreamNotExist)
-       }
-       return sm, nil
+       return s.schemaRepo.Stream(metadata)
 }
 
 func (s *service) FlagSet() *run.FlagSet {
@@ -102,11 +97,6 @@ func (s *service) PreRun(_ context.Context) error {
        path := path.Join(s.root, s.Name())
        observability.UpdatePath(path)
        s.schemaRepo = newSchemaRepo(path, s.metadata, 
int64(s.blockBufferSize), s.dbOpts, s.l)
-       // run a serial watcher
-       s.schemaRepo.Watcher()
-       s.metadata.RegisterHandler("stream", 
schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
-               &s.schemaRepo)
-
        s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
 
        errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index fb60bd64..a989ebb2 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -22,33 +22,24 @@ package stream
 import (
        "context"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 // a chunk is 1MB.
 const chunkSize = 1 << 20
 
-var _ schema.Resource = (*stream)(nil)
-
 type stream struct {
-       db                     tsdb.Supplier
-       l                      *logger.Logger
-       schema                 *databasev1.Stream
-       indexWriter            *index.Writer
-       name                   string
-       group                  string
-       indexRules             []*databasev1.IndexRule
-       maxObservedModRevision int64
-       shardNum               uint32
-}
-
-func (s *stream) GetMetadata() *commonv1.Metadata {
-       return s.schema.Metadata
+       db          tsdb.Supplier
+       l           *logger.Logger
+       schema      *databasev1.Stream
+       indexWriter *index.Writer
+       name        string
+       group       string
+       indexRules  []*databasev1.IndexRule
+       shardNum    uint32
 }
 
 func (s *stream) GetSchema() *databasev1.Stream {
@@ -59,21 +50,12 @@ func (s *stream) GetIndexRules() []*databasev1.IndexRule {
        return s.indexRules
 }
 
-func (s *stream) MaxObservedModRevision() int64 {
-       return s.maxObservedModRevision
-}
-
-func (s *stream) GetTopN() []*databasev1.TopNAggregation {
-       return nil
-}
-
 func (s *stream) Close() error {
        return nil
 }
 
 func (s *stream) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
-       s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
 }
 
 type streamSpec struct {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 537ac018..96379185 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -3,6 +3,12 @@
 
 ## Table of Contents
 
+- [banyandb/cluster/v1/rpc.proto](#banyandb_cluster_v1_rpc-proto)
+    - [SendRequest](#banyandb-cluster-v1-SendRequest)
+    - [SendResponse](#banyandb-cluster-v1-SendResponse)
+  
+    - [Service](#banyandb-cluster-v1-Service)
+  
 - [banyandb/common/v1/common.proto](#banyandb_common_v1_common-proto)
     - [Group](#banyandb-common-v1-Group)
     - [IntervalRule](#banyandb-common-v1-IntervalRule)
@@ -12,6 +18,12 @@
     - [Catalog](#banyandb-common-v1-Catalog)
     - [IntervalRule.Unit](#banyandb-common-v1-IntervalRule-Unit)
   
+- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto)
+    - [Node](#banyandb-database-v1-Node)
+    - [Shard](#banyandb-database-v1-Shard)
+  
+    - [Role](#banyandb-database-v1-Role)
+  
 - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto)
     - [FieldValue](#banyandb-model-v1-FieldValue)
     - [Float](#banyandb-model-v1-Float)
@@ -24,30 +36,6 @@
   
     - [AggregationFunction](#banyandb-model-v1-AggregationFunction)
   
-- [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto)
-    - [DataPointValue](#banyandb-measure-v1-DataPointValue)
-    - [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest)
-    - [WriteRequest](#banyandb-measure-v1-WriteRequest)
-    - [WriteResponse](#banyandb-measure-v1-WriteResponse)
-  
-- [banyandb/stream/v1/write.proto](#banyandb_stream_v1_write-proto)
-    - [ElementValue](#banyandb-stream-v1-ElementValue)
-    - [InternalWriteRequest](#banyandb-stream-v1-InternalWriteRequest)
-    - [WriteRequest](#banyandb-stream-v1-WriteRequest)
-    - [WriteResponse](#banyandb-stream-v1-WriteResponse)
-  
-- [banyandb/cluster/v1/rpc.proto](#banyandb_cluster_v1_rpc-proto)
-    - [WriteRequest](#banyandb-cluster-v1-WriteRequest)
-    - [WriteResponse](#banyandb-cluster-v1-WriteResponse)
-  
-    - [Service](#banyandb-cluster-v1-Service)
-  
-- [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto)
-    - [Node](#banyandb-database-v1-Node)
-    - [Shard](#banyandb-database-v1-Shard)
-  
-    - [Role](#banyandb-database-v1-Role)
-  
 - [banyandb/model/v1/query.proto](#banyandb_model_v1_query-proto)
     - [Condition](#banyandb-model-v1-Condition)
     - [Criteria](#banyandb-model-v1-Criteria)
@@ -180,6 +168,12 @@
     - [TopNRequest](#banyandb-measure-v1-TopNRequest)
     - [TopNResponse](#banyandb-measure-v1-TopNResponse)
   
+- [banyandb/measure/v1/write.proto](#banyandb_measure_v1_write-proto)
+    - [DataPointValue](#banyandb-measure-v1-DataPointValue)
+    - [InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest)
+    - [WriteRequest](#banyandb-measure-v1-WriteRequest)
+    - [WriteResponse](#banyandb-measure-v1-WriteResponse)
+  
 - [banyandb/measure/v1/rpc.proto](#banyandb_measure_v1_rpc-proto)
     - [MeasureService](#banyandb-measure-v1-MeasureService)
   
@@ -206,6 +200,12 @@
     - [QueryRequest](#banyandb-stream-v1-QueryRequest)
     - [QueryResponse](#banyandb-stream-v1-QueryResponse)
   
+- [banyandb/stream/v1/write.proto](#banyandb_stream_v1_write-proto)
+    - [ElementValue](#banyandb-stream-v1-ElementValue)
+    - [InternalWriteRequest](#banyandb-stream-v1-InternalWriteRequest)
+    - [WriteRequest](#banyandb-stream-v1-WriteRequest)
+    - [WriteResponse](#banyandb-stream-v1-WriteResponse)
+  
 - [banyandb/stream/v1/rpc.proto](#banyandb_stream_v1_rpc-proto)
     - [StreamService](#banyandb-stream-v1-StreamService)
   
@@ -213,6 +213,66 @@
 
 
 
+<a name="banyandb_cluster_v1_rpc-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/cluster/v1/rpc.proto
+
+
+
+<a name="banyandb-cluster-v1-SendRequest"></a>
+
+### SendRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| topic | [string](#string) |  |  |
+| message_id | [uint64](#uint64) |  |  |
+| body | [google.protobuf.Any](#google-protobuf-Any) |  |  |
+
+
+
+
+
+
+<a name="banyandb-cluster-v1-SendResponse"></a>
+
+### SendResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| message_id | [uint64](#uint64) |  |  |
+| error | [string](#string) |  |  |
+| body | [google.protobuf.Any](#google-protobuf-Any) |  |  |
+
+
+
+
+
+ 
+
+ 
+
+ 
+
+
+<a name="banyandb-cluster-v1-Service"></a>
+
+### Service
+
+
+| Method Name | Request Type | Response Type | Description |
+| ----------- | ------------ | ------------- | ------------|
+| Send | [SendRequest](#banyandb-cluster-v1-SendRequest) stream | 
[SendResponse](#banyandb-cluster-v1-SendResponse) stream |  |
+
+ 
+
+
+
 <a name="banyandb_common_v1_common-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
@@ -326,136 +386,47 @@ Metadata is for multi-tenant, multi-model use
 
 
 
-<a name="banyandb_model_v1_common-proto"></a>
+<a name="banyandb_database_v1_database-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
-## banyandb/model/v1/common.proto
-
-
-
-<a name="banyandb-model-v1-FieldValue"></a>
-
-### FieldValue
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| null | [google.protobuf.NullValue](#google-protobuf-NullValue) |  |  |
-| str | [Str](#banyandb-model-v1-Str) |  |  |
-| int | [Int](#banyandb-model-v1-Int) |  |  |
-| binary_data | [bytes](#bytes) |  |  |
-| float | [Float](#banyandb-model-v1-Float) |  |  |
-
-
-
-
-
-
-<a name="banyandb-model-v1-Float"></a>
-
-### Float
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| value | [double](#double) |  |  |
-
-
-
-
-
-
-<a name="banyandb-model-v1-Int"></a>
-
-### Int
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| value | [int64](#int64) |  |  |
-
-
-
-
-
-
-<a name="banyandb-model-v1-IntArray"></a>
-
-### IntArray
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| value | [int64](#int64) | repeated |  |
-
-
-
-
-
-
-<a name="banyandb-model-v1-Str"></a>
-
-### Str
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| value | [string](#string) |  |  |
-
-
-
-
-
-
-<a name="banyandb-model-v1-StrArray"></a>
-
-### StrArray
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| value | [string](#string) | repeated |  |
-
-
-
+## banyandb/database/v1/database.proto
 
 
 
-<a name="banyandb-model-v1-TagFamilyForWrite"></a>
+<a name="banyandb-database-v1-Node"></a>
 
-### TagFamilyForWrite
+### Node
 
 
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| tags | [TagValue](#banyandb-model-v1-TagValue) | repeated |  |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+| roles | [Role](#banyandb-database-v1-Role) | repeated |  |
+| grpc_address | [string](#string) |  |  |
+| http_address | [string](#string) |  |  |
+| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
 
 
 
 
 
 
-<a name="banyandb-model-v1-TagValue"></a>
+<a name="banyandb-database-v1-Shard"></a>
 
-### TagValue
+### Shard
 
 
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| null | [google.protobuf.NullValue](#google-protobuf-NullValue) |  |  |
-| str | [Str](#banyandb-model-v1-Str) |  |  |
-| str_array | [StrArray](#banyandb-model-v1-StrArray) |  |  |
-| int | [Int](#banyandb-model-v1-Int) |  |  |
-| int_array | [IntArray](#banyandb-model-v1-IntArray) |  |  |
-| binary_data | [bytes](#bytes) |  |  |
+| id | [uint64](#uint64) |  |  |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
+| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) |  |  |
+| node | [string](#string) |  |  |
+| total | [uint32](#uint32) |  |  |
+| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
+| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
 
 
 
@@ -464,97 +435,18 @@ Metadata is for multi-tenant, multi-model use
  
 
 
-<a name="banyandb-model-v1-AggregationFunction"></a>
+<a name="banyandb-database-v1-Role"></a>
 
-### AggregationFunction
+### Role
 
 
 | Name | Number | Description |
 | ---- | ------ | ----------- |
-| AGGREGATION_FUNCTION_UNSPECIFIED | 0 |  |
-| AGGREGATION_FUNCTION_MEAN | 1 |  |
-| AGGREGATION_FUNCTION_MAX | 2 |  |
-| AGGREGATION_FUNCTION_MIN | 3 |  |
-| AGGREGATION_FUNCTION_COUNT | 4 |  |
-| AGGREGATION_FUNCTION_SUM | 5 |  |
-
-
- 
-
- 
-
- 
-
-
-
-<a name="banyandb_measure_v1_write-proto"></a>
-<p align="right"><a href="#top">Top</a></p>
-
-## banyandb/measure/v1/write.proto
-
-
-
-<a name="banyandb-measure-v1-DataPointValue"></a>
-
-### DataPointValue
-DataPointValue is the data point for writing. It only contains values.
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
timestamp is in the timeunit of milliseconds. |
-| tag_families | 
[banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | 
repeated | the order of tag_families&#39; items match the measure schema |
-| fields | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | 
repeated | the order of fields match the measure schema |
-
-
-
-
-
-
-<a name="banyandb-measure-v1-InternalWriteRequest"></a>
-
-### InternalWriteRequest
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| shard_id | [uint32](#uint32) |  |  |
-| series_hash | [bytes](#bytes) |  |  |
-| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | 
repeated |  |
-| request | [WriteRequest](#banyandb-measure-v1-WriteRequest) |  |  |
-
-
-
-
-
-
-<a name="banyandb-measure-v1-WriteRequest"></a>
-
-### WriteRequest
-WriteRequest is the request contract for write
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is required. |
-| data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) |  | the 
data_point is required. |
-
-
-
-
-
-
-<a name="banyandb-measure-v1-WriteResponse"></a>
-
-### WriteResponse
-WriteResponse is the response contract for write
-
-
-
-
+| ROLE_UNSPECIFIED | 0 |  |
+| ROLE_META | 1 |  |
+| ROLE_DATA | 2 |  |
+| ROLE_LIAISON | 3 |  |
 
- 
 
  
 
@@ -564,178 +456,136 @@ WriteResponse is the response contract for write
 
 
 
-<a name="banyandb_stream_v1_write-proto"></a>
+<a name="banyandb_model_v1_common-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
-## banyandb/stream/v1/write.proto
-
-
-
-<a name="banyandb-stream-v1-ElementValue"></a>
-
-### ElementValue
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| element_id | [string](#string) |  | element_id could be span_id of a Span or 
segment_id of a Segment in the context of stream |
-| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
timestamp is in the timeunit of milliseconds. It represents 1) either the start 
time of a Span/Segment, 2) or the timestamp of a log |
-| tag_families | 
[banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | 
repeated | the order of tag_families&#39; items match the stream schema |
-
-
-
-
-
-
-<a name="banyandb-stream-v1-InternalWriteRequest"></a>
-
-### InternalWriteRequest
-
-
-
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| shard_id | [uint32](#uint32) |  |  |
-| series_hash | [bytes](#bytes) |  |  |
-| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | 
repeated |  |
-| request | [WriteRequest](#banyandb-stream-v1-WriteRequest) |  |  |
-
-
-
+## banyandb/model/v1/common.proto
 
 
 
-<a name="banyandb-stream-v1-WriteRequest"></a>
+<a name="banyandb-model-v1-FieldValue"></a>
 
-### WriteRequest
+### FieldValue
 
 
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is only required in the first write. |
-| element | [ElementValue](#banyandb-stream-v1-ElementValue) |  | the element 
is required. |
+| null | [google.protobuf.NullValue](#google-protobuf-NullValue) |  |  |
+| str | [Str](#banyandb-model-v1-Str) |  |  |
+| int | [Int](#banyandb-model-v1-Int) |  |  |
+| binary_data | [bytes](#bytes) |  |  |
+| float | [Float](#banyandb-model-v1-Float) |  |  |
 
 
 
 
 
 
-<a name="banyandb-stream-v1-WriteResponse"></a>
+<a name="banyandb-model-v1-Float"></a>
 
-### WriteResponse
+### Float
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [double](#double) |  |  |
 
 
 
- 
 
- 
 
- 
 
- 
+<a name="banyandb-model-v1-Int"></a>
 
+### Int
 
 
-<a name="banyandb_cluster_v1_rpc-proto"></a>
-<p align="right"><a href="#top">Top</a></p>
 
-## banyandb/cluster/v1/rpc.proto
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [int64](#int64) |  |  |
 
 
 
-<a name="banyandb-cluster-v1-WriteRequest"></a>
 
-### WriteRequest
 
 
+<a name="banyandb-model-v1-IntArray"></a>
 
-| Field | Type | Label | Description |
-| ----- | ---- | ----- | ----------- |
-| topic | [string](#string) |  |  |
-| message_id | [uint64](#uint64) |  |  |
-| stream | 
[banyandb.stream.v1.InternalWriteRequest](#banyandb-stream-v1-InternalWriteRequest)
 |  |  |
-| measure | 
[banyandb.measure.v1.InternalWriteRequest](#banyandb-measure-v1-InternalWriteRequest)
 |  |  |
+### IntArray
 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [int64](#int64) | repeated |  |
 
 
 
-<a name="banyandb-cluster-v1-WriteResponse"></a>
 
-### WriteResponse
 
 
+<a name="banyandb-model-v1-Str"></a>
 
+### Str
 
 
 
- 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [string](#string) |  |  |
 
- 
 
- 
 
 
-<a name="banyandb-cluster-v1-Service"></a>
 
-### Service
 
+<a name="banyandb-model-v1-StrArray"></a>
 
-| Method Name | Request Type | Response Type | Description |
-| ----------- | ------------ | ------------- | ------------|
-| Write | [WriteRequest](#banyandb-cluster-v1-WriteRequest) stream | 
[WriteResponse](#banyandb-cluster-v1-WriteResponse) stream |  |
+### StrArray
 
- 
 
 
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| value | [string](#string) | repeated |  |
 
-<a name="banyandb_database_v1_database-proto"></a>
-<p align="right"><a href="#top">Top</a></p>
 
-## banyandb/database/v1/database.proto
 
 
 
-<a name="banyandb-database-v1-Node"></a>
 
-### Node
+<a name="banyandb-model-v1-TagFamilyForWrite"></a>
+
+### TagFamilyForWrite
 
 
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
-| roles | [Role](#banyandb-database-v1-Role) | repeated |  |
-| grpc_address | [string](#string) |  |  |
-| http_address | [string](#string) |  |  |
-| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
+| tags | [TagValue](#banyandb-model-v1-TagValue) | repeated |  |
 
 
 
 
 
 
-<a name="banyandb-database-v1-Shard"></a>
+<a name="banyandb-model-v1-TagValue"></a>
 
-### Shard
+### TagValue
 
 
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| id | [uint64](#uint64) |  |  |
-| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
-| catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) |  |  |
-| node | [string](#string) |  |  |
-| total | [uint32](#uint32) |  |  |
-| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
-| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
+| null | [google.protobuf.NullValue](#google-protobuf-NullValue) |  |  |
+| str | [Str](#banyandb-model-v1-Str) |  |  |
+| str_array | [StrArray](#banyandb-model-v1-StrArray) |  |  |
+| int | [Int](#banyandb-model-v1-Int) |  |  |
+| int_array | [IntArray](#banyandb-model-v1-IntArray) |  |  |
+| binary_data | [bytes](#bytes) |  |  |
 
 
 
@@ -744,18 +594,19 @@ WriteResponse is the response contract for write
  
 
 
-<a name="banyandb-database-v1-Role"></a>
+<a name="banyandb-model-v1-AggregationFunction"></a>
 
-### Role
+### AggregationFunction
 
 
 | Name | Number | Description |
 | ---- | ------ | ----------- |
-| ROLE_UNSPECIFIED | 0 |  |
-| ROLE_META | 1 |  |
-| ROLE_DATA | 2 |  |
-| ROLE_QUERY | 3 |  |
-| ROLE_LIAISON | 4 |  |
+| AGGREGATION_FUNCTION_UNSPECIFIED | 0 |  |
+| AGGREGATION_FUNCTION_MEAN | 1 |  |
+| AGGREGATION_FUNCTION_MAX | 2 |  |
+| AGGREGATION_FUNCTION_MIN | 3 |  |
+| AGGREGATION_FUNCTION_COUNT | 4 |  |
+| AGGREGATION_FUNCTION_SUM | 5 |  |
 
 
  
@@ -2647,6 +2498,83 @@ TopNResponse is the response for a query to the Query 
module.
 
 
 
+<a name="banyandb_measure_v1_write-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/measure/v1/write.proto
+
+
+
+<a name="banyandb-measure-v1-DataPointValue"></a>
+
+### DataPointValue
+DataPointValue is the data point for writing. It only contains values.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
timestamp is in the timeunit of milliseconds. |
+| tag_families | 
[banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | 
repeated | the order of tag_families&#39; items match the measure schema |
+| fields | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | 
repeated | the order of fields match the measure schema |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-InternalWriteRequest"></a>
+
+### InternalWriteRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| shard_id | [uint32](#uint32) |  |  |
+| series_hash | [bytes](#bytes) |  |  |
+| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | 
repeated |  |
+| request | [WriteRequest](#banyandb-measure-v1-WriteRequest) |  |  |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-WriteRequest"></a>
+
+### WriteRequest
+WriteRequest is the request contract for write
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is required. |
+| data_point | [DataPointValue](#banyandb-measure-v1-DataPointValue) |  | the 
data_point is required. |
+
+
+
+
+
+
+<a name="banyandb-measure-v1-WriteResponse"></a>
+
+### WriteResponse
+WriteResponse is the response contract for write
+
+
+
+
+
+ 
+
+ 
+
+ 
+
+ 
+
+
+
 <a name="banyandb_measure_v1_rpc-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
@@ -2964,6 +2892,83 @@ QueryResponse is the response for a query to the Query 
module.
 
 
 
+<a name="banyandb_stream_v1_write-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/stream/v1/write.proto
+
+
+
+<a name="banyandb-stream-v1-ElementValue"></a>
+
+### ElementValue
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| element_id | [string](#string) |  | element_id could be span_id of a Span or 
segment_id of a Segment in the context of stream |
+| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  | 
timestamp is in the timeunit of milliseconds. It represents 1) either the start 
time of a Span/Segment, 2) or the timestamp of a log |
+| tag_families | 
[banyandb.model.v1.TagFamilyForWrite](#banyandb-model-v1-TagFamilyForWrite) | 
repeated | the order of tag_families&#39; items match the stream schema |
+
+
+
+
+
+
+<a name="banyandb-stream-v1-InternalWriteRequest"></a>
+
+### InternalWriteRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| shard_id | [uint32](#uint32) |  |  |
+| series_hash | [bytes](#bytes) |  |  |
+| entity_values | [banyandb.model.v1.TagValue](#banyandb-model-v1-TagValue) | 
repeated |  |
+| request | [WriteRequest](#banyandb-stream-v1-WriteRequest) |  |  |
+
+
+
+
+
+
+<a name="banyandb-stream-v1-WriteRequest"></a>
+
+### WriteRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  | 
the metadata is only required in the first write. |
+| element | [ElementValue](#banyandb-stream-v1-ElementValue) |  | the element 
is required. |
+
+
+
+
+
+
+<a name="banyandb-stream-v1-WriteResponse"></a>
+
+### WriteResponse
+
+
+
+
+
+
+ 
+
+ 
+
+ 
+
+ 
+
+
+
 <a name="banyandb_stream_v1_rpc-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index c7a8247b..6ad65ada 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -94,6 +94,11 @@ type Publisher interface {
        Publish(topic Topic, message ...Message) (Future, error)
 }
 
+// Broadcaster allow sending Messages to a Topic and receiving the responses.
+type Broadcaster interface {
+       Broadcast(topic Topic, message Message) ([]Future, error)
+}
+
 type channel chan event
 
 type chType int
diff --git a/pkg/iter/sort/sort.go b/pkg/iter/sort/sort.go
new file mode 100644
index 00000000..b0d67eb5
--- /dev/null
+++ b/pkg/iter/sort/sort.go
@@ -0,0 +1,133 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package sort provides a generic iterator that merges multiple sorted 
iterators.
+package sort
+
+import (
+       "bytes"
+       "container/heap"
+       "fmt"
+)
+
+// Comparable is an interface that allows sorting of items.
+type Comparable interface {
+       SortedField() []byte
+}
+
+// Iterator is a stream of items of Comparable type.
+type Iterator[T Comparable] interface {
+       Next() bool
+       Val() T
+       Close() error
+}
+
+type container[T Comparable] struct {
+       item T
+       iter Iterator[T]
+}
+
+type containerHeap[T Comparable] struct {
+       items []*container[T]
+       desc  bool
+}
+
+func (h containerHeap[T]) Len() int {
+       return len(h.items)
+}
+
+func (h containerHeap[T]) Less(i, j int) bool {
+       if h.desc {
+               return bytes.Compare(h.items[i].item.SortedField(), 
h.items[j].item.SortedField()) > 0
+       }
+       return bytes.Compare(h.items[i].item.SortedField(), 
h.items[j].item.SortedField()) < 0
+}
+
+func (h containerHeap[T]) Swap(i, j int) {
+       h.items[i], h.items[j] = h.items[j], h.items[i]
+}
+
+func (h *containerHeap[T]) Push(x interface{}) {
+       item := x.(*container[T])
+       h.items = append(h.items, item)
+}
+
+func (h *containerHeap[T]) Pop() interface{} {
+       old := h.items
+       n := len(old)
+       item := old[n-1]
+       h.items = old[0 : n-1]
+       return item
+}
+
+type itemIter[T Comparable] struct {
+       curr  T
+       h     *containerHeap[T]
+       iters []Iterator[T]
+}
+
+// NewItemIter returns a new iterator that merges multiple sorted iterators.
+func NewItemIter[T Comparable](iters []Iterator[T], desc bool) Iterator[T] {
+       var def T
+       it := &itemIter[T]{
+               iters: iters,
+               h:     &containerHeap[T]{items: make([]*container[T], 0), desc: 
desc},
+               curr:  def,
+       }
+       it.initialize()
+       return it
+}
+
+func (it *itemIter[T]) initialize() {
+       heap.Init(it.h)
+       for _, iter := range it.iters {
+               if iter.Next() {
+                       heap.Push(it.h, &container[T]{item: iter.Val(), iter: 
iter})
+               }
+       }
+}
+
+func (it *itemIter[T]) pushIterator(iter Iterator[T]) {
+       if iter.Next() {
+               heap.Push(it.h, &container[T]{item: iter.Val(), iter: iter})
+       }
+}
+
+func (it *itemIter[T]) Next() bool {
+       if it.h.Len() == 0 {
+               return false
+       }
+       top := heap.Pop(it.h).(*container[T])
+       it.curr = top.item
+       it.pushIterator(top.iter)
+       return true
+}
+
+func (it *itemIter[T]) Val() T {
+       return it.curr
+}
+
+func (it *itemIter[T]) Close() error {
+       var err error
+       for _, iter := range it.iters {
+               if e := iter.Close(); e != nil {
+                       fmt.Println("Error closing iterator:", e)
+                       err = e
+               }
+       }
+       return err
+}
diff --git a/pkg/iter/sort/sort_test.go b/pkg/iter/sort/sort_test.go
new file mode 100644
index 00000000..e91a0f96
--- /dev/null
+++ b/pkg/iter/sort/sort_test.go
@@ -0,0 +1,102 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sort_test
+
+import (
+       "encoding/binary"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+)
+
+type Int int
+
+func (i Int) SortedField() []byte {
+       b := make([]byte, 8)
+       binary.BigEndian.PutUint64(b, uint64(i))
+       return b
+}
+
+// MockIterator is a mock for the Iterator interface.
+type MockIterator struct {
+       items []Int
+       index int
+}
+
+func NewMockIterator(items []Int) *MockIterator {
+       return &MockIterator{items: items, index: -1}
+}
+
+func (mi *MockIterator) Next() bool {
+       mi.index++
+       return mi.index < len(mi.items)
+}
+
+func (mi *MockIterator) Val() Int {
+       return mi.items[mi.index]
+}
+
+func (mi *MockIterator) Close() error {
+       return nil
+}
+
+func TestItemIter_Ascending(t *testing.T) {
+       iters := []sort.Iterator[Int]{
+               NewMockIterator([]Int{1, 3, 5}),
+               NewMockIterator([]Int{2, 4, 6}),
+       }
+
+       iter := sort.NewItemIter(iters, false) // Ascending order
+       for i := Int(1); i <= 6; i++ {
+               if !iter.Next() {
+                       t.Fatalf("expected Next() to be true, got false")
+               }
+               if val := iter.Val(); val != i {
+                       t.Errorf("expected Val() to be %d, got %d", i, val)
+               }
+       }
+       if iter.Next() {
+               t.Errorf("expected Next() to be false, got true")
+       }
+       if err := iter.Close(); err != nil {
+               t.Errorf("expected Close() to return nil, got error: %v", err)
+       }
+}
+
+func TestItemIter_Descending(t *testing.T) {
+       iters := []sort.Iterator[Int]{
+               NewMockIterator([]Int{5, 3, 1}),
+               NewMockIterator([]Int{6, 4, 2}),
+       }
+
+       iter := sort.NewItemIter(iters, true) // Descending order
+       for i := Int(6); i >= 1; i-- {
+               if !iter.Next() {
+                       t.Fatalf("expected Next() to be true, got false")
+               }
+               if val := iter.Val(); val != i {
+                       t.Errorf("expected Val() to be %d, got %d", i, val)
+               }
+       }
+       if iter.Next() {
+               t.Errorf("expected Next() to be false, got true")
+       }
+       if err := iter.Close(); err != nil {
+               t.Errorf("expected Close() to return nil, got error: %v", err)
+       }
+}
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index b5b184ae..a206293b 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -19,11 +19,14 @@
 package executor
 
 import (
+       "context"
+
        "github.com/apache/skywalking-banyandb/api/common"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
 // ExecutionContext allows retrieving data from tsdb.
@@ -39,9 +42,24 @@ type StreamExecutionContext interface {
        ParseElementID(item tsdb.Item) (string, error)
 }
 
+// StreamExecutionContextKey is the key of stream execution context in 
context.Context.
+type StreamExecutionContextKey struct{}
+
+var streamExecutionContextKeyInstance = StreamExecutionContextKey{}
+
+// WithStreamExecutionContext returns a new context with stream execution 
context.
+func WithStreamExecutionContext(ctx context.Context, ec 
StreamExecutionContext) context.Context {
+       return context.WithValue(ctx, streamExecutionContextKeyInstance, ec)
+}
+
+// FromStreamExecutionContext returns the stream execution context from 
context.Context.
+func FromStreamExecutionContext(ctx context.Context) StreamExecutionContext {
+       return 
ctx.Value(streamExecutionContextKeyInstance).(StreamExecutionContext)
+}
+
 // StreamExecutable allows querying in the stream schema.
 type StreamExecutable interface {
-       Execute(StreamExecutionContext) ([]*streamv1.Element, error)
+       Execute(context.Context) ([]*streamv1.Element, error)
 }
 
 // MeasureExecutionContext allows retrieving data through the measure module.
@@ -50,6 +68,21 @@ type MeasureExecutionContext interface {
        ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_Field, 
error)
 }
 
+// MeasureExecutionContextKey is the key of measure execution context in 
context.Context.
+type MeasureExecutionContextKey struct{}
+
+var measureExecutionContextKeyInstance = MeasureExecutionContextKey{}
+
+// WithMeasureExecutionContext returns a new context with measure execution 
context.
+func WithMeasureExecutionContext(ctx context.Context, ec 
MeasureExecutionContext) context.Context {
+       return context.WithValue(ctx, measureExecutionContextKeyInstance, ec)
+}
+
+// FromMeasureExecutionContext returns the measure execution context from 
context.Context.
+func FromMeasureExecutionContext(ctx context.Context) MeasureExecutionContext {
+       return 
ctx.Value(measureExecutionContextKeyInstance).(MeasureExecutionContext)
+}
+
 // MIterator allows iterating in a measure data set.
 type MIterator interface {
        Next() bool
@@ -61,5 +94,26 @@ type MIterator interface {
 
 // MeasureExecutable allows querying in the measure schema.
 type MeasureExecutable interface {
-       Execute(MeasureExecutionContext) (MIterator, error)
+       Execute(context.Context) (MIterator, error)
+}
+
+// DistributedExecutionContext allows retrieving data through the distributed 
module.
+type DistributedExecutionContext interface {
+       bus.Broadcaster
+       TimeRange() *modelv1.TimeRange
+}
+
+// DistributedExecutionContextKey is the key of distributed execution context 
in context.Context.
+type DistributedExecutionContextKey struct{}
+
+var distributedExecutionContextKeyInstance = DistributedExecutionContextKey{}
+
+// WithDistributedExecutionContext returns a new context with distributed 
execution context.
+func WithDistributedExecutionContext(ctx context.Context, ec 
DistributedExecutionContext) context.Context {
+       return context.WithValue(ctx, distributedExecutionContextKeyInstance, 
ec)
+}
+
+// FromDistributedExecutionContext returns the distributed execution context 
from context.Context.
+func FromDistributedExecutionContext(ctx context.Context) 
DistributedExecutionContext {
+       return 
ctx.Value(distributedExecutionContextKeyInstance).(DistributedExecutionContext)
 }
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 73a46032..4297c296 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -18,7 +18,6 @@
 package logical
 
 import (
-       "bytes"
        "context"
        "io"
        "time"
@@ -28,6 +27,7 @@ import (
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -48,20 +48,8 @@ type (
        // SeekerBuilder wraps the execution of tsdb.SeekerBuilder.
        // TODO:// we could have a chance to remove this wrapper.
        SeekerBuilder func(builder tsdb.SeekerBuilder)
-
-       comparator func(a, b tsdb.Item) bool
 )
 
-func createComparator(sortDirection modelv1.Sort) comparator {
-       return func(a, b tsdb.Item) bool {
-               comp := bytes.Compare(a.SortedField(), b.SortedField())
-               if sortDirection == modelv1.Sort_SORT_DESC {
-                       return comp == 1
-               }
-               return comp == -1
-       }
-}
-
 // ProjectItem parses the item within the StreamExecutionContext.
 // projectionFieldRefs must be prepared before calling this method, 
projectionFieldRefs should be a list of
 // tag list where the inner list must exist in the same tag family.
@@ -104,16 +92,16 @@ func ProjectItem(ec executor.ExecutionContext, item 
tsdb.Item, projectionFieldRe
 // with the help of Entity. The result is a list of element set, where the 
order of inner list is kept
 // as what the users specify in the seekerBuilder.
 // This method is used by the underlying tableScan and localIndexScan plans.
-func ExecuteForShard(l *logger.Logger, series tsdb.SeriesList, timeRange 
timestamp.TimeRange,
+func ExecuteForShard(ctx context.Context, l *logger.Logger, series 
tsdb.SeriesList, timeRange timestamp.TimeRange,
        builders ...SeekerBuilder,
 ) ([]tsdb.Iterator, []io.Closer, error) {
        var itersInShard []tsdb.Iterator
        var closers []io.Closer
        for _, seriesFound := range series {
                itersInSeries, err := func() ([]tsdb.Iterator, error) {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       ctxSeries, cancel := context.WithTimeout(ctx, 
5*time.Second)
                        defer cancel()
-                       sp, errInner := seriesFound.Span(context.WithValue(ctx, 
logger.ContextKey, l), timeRange)
+                       sp, errInner := 
seriesFound.Span(context.WithValue(ctxSeries, logger.ContextKey, l), timeRange)
                        if errInner != nil {
                                if errors.Is(errInner, tsdb.ErrEmptySeriesSpan) 
{
                                        return nil, nil
@@ -222,3 +210,15 @@ func StringSlicesEqual(a, b []string) bool {
        }
        return true
 }
+
+// NewItemIter returns a ItemIterator which mergers several tsdb.Iterator by 
input sorting order.
+func NewItemIter(iters []tsdb.Iterator, s modelv1.Sort) 
sort.Iterator[tsdb.Item] {
+       var ii []sort.Iterator[tsdb.Item]
+       for _, iter := range iters {
+               ii = append(ii, iter)
+       }
+       if s == modelv1.Sort_SORT_DESC {
+               return sort.NewItemIter[tsdb.Item](ii, true)
+       }
+       return sort.NewItemIter[tsdb.Item](ii, false)
+}
diff --git a/pkg/query/logical/index_filter.go 
b/pkg/query/logical/index_filter.go
index 0d16d180..4ab9ff01 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -47,7 +47,9 @@ func (g GlobalIndexError) Error() string { return 
g.IndexRule.String() }
 
 // BuildLocalFilter returns a new index.Filter for local indices.
 // It could parse series Path at the same time.
-func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict 
map[string]int, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) {
+func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict 
map[string]int,
+       entity tsdb.Entity, mandatoryIndexRule bool,
+) (index.Filter, []tsdb.Entity, error) {
        if criteria == nil {
                return nil, []tsdb.Entity{entity}, nil
        }
@@ -74,6 +76,8 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
                                }
                        }
                        return parseCondition(cond, indexRule, expr, entity)
+               } else if mandatoryIndexRule {
+                       return nil, nil, 
errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
                }
                return eNode, []tsdb.Entity{entity}, nil
        case *modelv1.Criteria_Le:
@@ -82,16 +86,16 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
                        return nil, nil, 
errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of 
[%v] are empty", criteria)
                }
                if le.GetLeft() == nil {
-                       return BuildLocalFilter(le.Right, schema, entityDict, 
entity)
+                       return BuildLocalFilter(le.Right, schema, entityDict, 
entity, mandatoryIndexRule)
                }
                if le.GetRight() == nil {
-                       return BuildLocalFilter(le.Left, schema, entityDict, 
entity)
+                       return BuildLocalFilter(le.Left, schema, entityDict, 
entity, mandatoryIndexRule)
                }
-               left, leftEntities, err := BuildLocalFilter(le.Left, schema, 
entityDict, entity)
+               left, leftEntities, err := BuildLocalFilter(le.Left, schema, 
entityDict, entity, mandatoryIndexRule)
                if err != nil {
                        return nil, nil, err
                }
-               right, rightEntities, err := BuildLocalFilter(le.Right, schema, 
entityDict, entity)
+               right, rightEntities, err := BuildLocalFilter(le.Right, schema, 
entityDict, entity, mandatoryIndexRule)
                if err != nil {
                        return nil, nil, err
                }
diff --git a/pkg/query/logical/iter.go b/pkg/query/logical/iter.go
deleted file mode 100644
index e522969a..00000000
--- a/pkg/query/logical/iter.go
+++ /dev/null
@@ -1,134 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package logical
-
-import (
-       "container/heap"
-       "io"
-
-       "go.uber.org/multierr"
-
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/banyand/tsdb"
-)
-
-var _ ItemIterator = (*itemIter)(nil)
-
-// ItemIterator allow iterating over a tsdb's series.
-type ItemIterator interface {
-       HasNext() bool
-       Next() tsdb.Item
-       io.Closer
-}
-
-var _ heap.Interface = (*containerHeap)(nil)
-
-// container contains both iter and its current item.
-type container struct {
-       c    comparator
-       item tsdb.Item
-       iter tsdb.Iterator
-}
-
-type containerHeap []*container
-
-func (h containerHeap) Len() int           { return len(h) }
-func (h containerHeap) Less(i, j int) bool { return h[i].c(h[i].item, 
h[j].item) }
-func (h containerHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
-
-func (h *containerHeap) Push(x interface{}) {
-       *h = append(*h, x.(*container))
-}
-
-func (h *containerHeap) Pop() interface{} {
-       old := *h
-       n := len(old)
-       x := old[n-1]
-       *h = old[0 : n-1]
-       return x
-}
-
-type itemIter struct {
-       c     comparator
-       h     *containerHeap
-       iters []tsdb.Iterator
-}
-
-// NewItemIter returns a ItemIterator which mergers several tsdb.Iterator by 
input sorting order.
-func NewItemIter(iters []tsdb.Iterator, sort modelv1.Sort) ItemIterator {
-       it := &itemIter{
-               c:     createComparator(sort),
-               iters: iters,
-               h:     &containerHeap{},
-       }
-       it.init()
-       return it
-}
-
-// init function MUST be called while initialization.
-// 1. Move all iterator to the first item by invoking their Next.
-// 2. Load all first items into a slice.
-func (it *itemIter) init() {
-       for _, iter := range it.iters {
-               it.pushIterator(iter)
-       }
-       // heap initialization
-       heap.Init(it.h)
-}
-
-// pushIterator pushes the given iterator into the underlying deque.
-// Status will be immediately checked if the Iterator has a next value.
-// 1 - If not, it will be close at once and will not be added to the slice,
-//
-//     which means inactive iterator does not exist in the deq.
-//
-// 2 - If so, it will be wrapped into a container and push to the deq.
-//
-//     Then we call SliceStable sort to sort the deq.
-func (it *itemIter) pushIterator(iter tsdb.Iterator) {
-       if !iter.Next() {
-               return
-       }
-       heap.Push(it.h, &container{
-               item: iter.Val(),
-               iter: iter,
-               c:    it.c,
-       })
-}
-
-func (it *itemIter) HasNext() bool {
-       return it.h.Len() > 0
-}
-
-func (it *itemIter) Next() tsdb.Item {
-       // 3. Pop up the minimal item through the order value
-       c := heap.Pop(it.h).(*container)
-
-       // 4. Move the iterator whose value is popped in step 3, push the next 
value of this iterator into the slice.
-       it.pushIterator(c.iter)
-
-       return c.item
-}
-
-// Close closes all underlying iterators.
-func (it *itemIter) Close() (err error) {
-       for _, iter := range it.iters {
-               err = multierr.Append(err, iter.Close())
-       }
-       return err
-}
diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index 30a3623d..0e7f0fff 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -22,21 +22,20 @@ import (
        "math"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 const defaultLimit uint32 = 100
 
 // BuildSchema returns Schema loaded from the metadata repository.
-func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) {
-       md := measureSchema.GetSchema()
+func BuildSchema(md *databasev1.Measure, indexRules []*databasev1.IndexRule) 
(logical.Schema, error) {
        md.GetEntity()
 
        ms := &schema{
                common: &logical.CommonSchema{
-                       IndexRules: measureSchema.GetIndexRules(),
+                       IndexRules: indexRules,
                        TagSpecMap: make(map[string]*logical.TagSpec),
                        EntityList: md.GetEntity().GetTagNames(),
                },
@@ -113,6 +112,66 @@ func Analyze(_ context.Context, criteria 
*measurev1.QueryRequest, metadata *comm
        return p, nil
 }
 
+// DistributedAnalyze converts logical expressions to executable operation 
tree represented by Plan.
+func DistributedAnalyze(criteria *measurev1.QueryRequest, s logical.Schema) 
(logical.Plan, error) {
+       groupByEntity := false
+       var groupByTags [][]*logical.Tag
+       if criteria.GetGroupBy() != nil {
+               groupByProjectionTags := 
criteria.GetGroupBy().GetTagProjection()
+               groupByTags = make([][]*logical.Tag, 
len(groupByProjectionTags.GetTagFamilies()))
+               tags := make([]string, 0)
+               for i, tagFamily := range 
groupByProjectionTags.GetTagFamilies() {
+                       groupByTags[i] = logical.NewTags(tagFamily.GetName(), 
tagFamily.GetTags()...)
+                       tags = append(tags, tagFamily.GetTags()...)
+               }
+               if logical.StringSlicesEqual(s.EntityList(), tags) {
+                       groupByEntity = true
+               }
+       }
+
+       // parse fields
+       plan := newUnresolvedDistributed(criteria)
+
+       // parse limit and offset
+       limitParameter := criteria.GetLimit()
+       if limitParameter == 0 {
+               limitParameter = defaultLimit
+       }
+       pushedLimit := int(limitParameter + criteria.GetOffset())
+
+       if criteria.GetGroupBy() != nil {
+               plan = newUnresolvedGroupBy(plan, groupByTags, groupByEntity)
+               pushedLimit = math.MaxInt
+       }
+
+       if criteria.GetAgg() != nil {
+               plan = newUnresolvedAggregation(plan,
+                       logical.NewField(criteria.GetAgg().GetFieldName()),
+                       criteria.GetAgg().GetFunction(),
+                       criteria.GetGroupBy() != nil,
+               )
+               pushedLimit = math.MaxInt
+       }
+
+       if criteria.GetTop() != nil {
+               plan = top(plan, criteria.GetTop())
+       }
+
+       plan = limit(plan, criteria.GetOffset(), limitParameter)
+       p, err := plan.Analyze(s)
+       if err != nil {
+               return nil, err
+       }
+       rules := []logical.OptimizeRule{
+               logical.NewPushDownOrder(criteria.OrderBy),
+               logical.NewPushDownMaxSize(pushedLimit),
+       }
+       if err := logical.ApplyRules(p, rules...); err != nil {
+               return nil, err
+       }
+       return p, nil
+}
+
 // parseFields parses the query request to decide which kind of plan should be 
generated
 // Basically,
 // 1 - If no criteria is given, we can only scan all shards
diff --git a/pkg/query/logical/measure/measure_plan.go 
b/pkg/query/logical/measure/measure_plan.go
index 870e5af9..d9bc6a68 100644
--- a/pkg/query/logical/measure/measure_plan.go
+++ b/pkg/query/logical/measure/measure_plan.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "context"
        "fmt"
 
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
@@ -36,7 +37,7 @@ type limitPlan struct {
        limit  uint32
 }
 
-func (l *limitPlan) Execute(ec executor.MeasureExecutionContext) 
(executor.MIterator, error) {
+func (l *limitPlan) Execute(ec context.Context) (executor.MIterator, error) {
        dps, err := l.Parent.Input.(executor.MeasureExecutable).Execute(ec)
        if err != nil {
                return nil, err
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go 
b/pkg/query/logical/measure/measure_plan_aggregation.go
index 480ff273..e304119e 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "context"
        "fmt"
 
        "github.com/pkg/errors"
@@ -121,7 +122,7 @@ func (g *aggregationPlan[N]) Schema() logical.Schema {
        return g.schema.ProjFields(g.aggregationFieldRef)
 }
 
-func (g *aggregationPlan[N]) Execute(ec executor.MeasureExecutionContext) 
(executor.MIterator, error) {
+func (g *aggregationPlan[N]) Execute(ec context.Context) (executor.MIterator, 
error) {
        iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
        if err != nil {
                return nil, err
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
new file mode 100644
index 00000000..45ee4c66
--- /dev/null
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -0,0 +1,234 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/proto"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
+
+type unresolvedDistributed struct {
+       originalQuery     *measurev1.QueryRequest
+       order             *logical.OrderBy
+       maxDataPointsSize int
+}
+
+func newUnresolvedDistributed(query *measurev1.QueryRequest) 
logical.UnresolvedPlan {
+       return &unresolvedDistributed{
+               originalQuery: query,
+       }
+}
+
+func (ud *unresolvedDistributed) Limit(max int) {
+       ud.maxDataPointsSize = max
+}
+
+func (ud *unresolvedDistributed) Sort(order *logical.OrderBy) {
+       ud.order = order
+}
+
+func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, 
error) {
+       if ud.originalQuery.TagProjection == nil {
+               return nil, fmt.Errorf("tag projection is required")
+       }
+       if ud.originalQuery.FieldProjection == nil {
+               return nil, fmt.Errorf("filed projection is required")
+       }
+       temp := &measurev1.QueryRequest{
+               TagProjection:   ud.originalQuery.TagProjection,
+               FieldProjection: ud.originalQuery.FieldProjection,
+               Metadata:        ud.originalQuery.Metadata,
+               Criteria:        ud.originalQuery.Criteria,
+               Limit:           uint32(ud.maxDataPointsSize),
+               OrderBy: &modelv1.QueryOrder{
+                       IndexRuleName: ud.order.Index.Metadata.Name,
+                       Sort:          ud.order.Sort,
+               },
+       }
+       if ud.order == nil {
+               return &distributedPlan{
+                       queryTemplate: temp,
+                       s:             s,
+                       sortByTime:    true,
+               }, nil
+       }
+       ok, indexRule := 
s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName)
+       if !ok {
+               return nil, fmt.Errorf("index rule %s not found", 
ud.originalQuery.OrderBy.IndexRuleName)
+       }
+       if len(indexRule.Tags) != 1 {
+               return nil, fmt.Errorf("index rule %s should have only one 
tag", ud.originalQuery.OrderBy.IndexRuleName)
+       }
+       sortTagSpec := s.FindTagSpecByName(indexRule.Tags[0])
+       if sortTagSpec == nil {
+               return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0])
+       }
+       result := &distributedPlan{
+               queryTemplate: temp,
+               s:             s,
+               sortByTime:    false,
+               sortTagSpec:   *sortTagSpec,
+       }
+       if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+               result.desc = true
+       }
+       return result, nil
+}
+
+type distributedPlan struct {
+       s             logical.Schema
+       queryTemplate *measurev1.QueryRequest
+       sortTagSpec   logical.TagSpec
+       sortByTime    bool
+       desc          bool
+}
+
+func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, 
error) {
+       dctx := executor.FromDistributedExecutionContext(ctx)
+       query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
+       query.TimeRange = dctx.TimeRange()
+       ff, err := dctx.Broadcast(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       if err != nil {
+               return nil, err
+       }
+       var allErr error
+       var see []sort.Iterator[*comparableDataPoint]
+       for _, f := range ff {
+               if m, getErr := f.Get(); getErr != nil {
+                       allErr = multierr.Append(allErr, getErr)
+               } else {
+                       see = append(see,
+                               
newSortableElements(m.Data().(*measurev1.QueryResponse).DataPoints,
+                                       t.sortByTime, t.sortTagSpec))
+               }
+       }
+       return &sortedMIterator{
+               Iterator: sort.NewItemIter[*comparableDataPoint](see, t.desc),
+       }, allErr
+}
+
+func (t *distributedPlan) String() string {
+       return fmt.Sprintf("distributed:%s", t.queryTemplate.String())
+}
+
+func (t *distributedPlan) Children() []logical.Plan {
+       return []logical.Plan{}
+}
+
+func (t *distributedPlan) Schema() logical.Schema {
+       return t.s
+}
+
+var _ sort.Comparable = (*comparableDataPoint)(nil)
+
+type comparableDataPoint struct {
+       *measurev1.DataPoint
+       sortField []byte
+}
+
+func newComparableElement(e *measurev1.DataPoint, sortByTime bool, sortTagSpec 
logical.TagSpec) (*comparableDataPoint, error) {
+       var sortField []byte
+       if sortByTime {
+               sortField = 
convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano()))
+       } else {
+               var err error
+               sortField, err = 
pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       return &comparableDataPoint{
+               DataPoint: e,
+               sortField: sortField,
+       }, nil
+}
+
+func (e *comparableDataPoint) SortedField() []byte {
+       return e.sortField
+}
+
+var _ sort.Iterator[*comparableDataPoint] = (*sortableElements)(nil)
+
+type sortableElements struct {
+       cur          *comparableDataPoint
+       dataPoints   []*measurev1.DataPoint
+       sortTagSpec  logical.TagSpec
+       index        int
+       isSortByTime bool
+}
+
+func newSortableElements(dataPoints []*measurev1.DataPoint, isSortByTime bool, 
sortTagSpec logical.TagSpec) *sortableElements {
+       return &sortableElements{
+               dataPoints:   dataPoints,
+               isSortByTime: isSortByTime,
+               sortTagSpec:  sortTagSpec,
+       }
+}
+
+func (*sortableElements) Close() error {
+       return nil
+}
+
+func (s *sortableElements) Next() bool {
+       return s.iter(func(e *measurev1.DataPoint) (*comparableDataPoint, 
error) {
+               return newComparableElement(e, s.isSortByTime, s.sortTagSpec)
+       })
+}
+
+func (s *sortableElements) Val() *comparableDataPoint {
+       return s.cur
+}
+
+func (s *sortableElements) iter(fn func(*measurev1.DataPoint) 
(*comparableDataPoint, error)) bool {
+       if s.index >= len(s.dataPoints) {
+               return false
+       }
+       cur, err := fn(s.dataPoints[s.index])
+       s.index++
+       if err != nil {
+               return s.iter(fn)
+       }
+       s.cur = cur
+       return s.index < len(s.dataPoints)
+}
+
+var _ executor.MIterator = (*sortedMIterator)(nil)
+
+type sortedMIterator struct {
+       sort.Iterator[*comparableDataPoint]
+}
+
+func (s *sortedMIterator) Current() []*measurev1.DataPoint {
+       return []*measurev1.DataPoint{s.Val().DataPoint}
+}
diff --git a/pkg/query/logical/measure/measure_plan_groupby.go 
b/pkg/query/logical/measure/measure_plan_groupby.go
index 60a907a5..9f62565d 100644
--- a/pkg/query/logical/measure/measure_plan_groupby.go
+++ b/pkg/query/logical/measure/measure_plan_groupby.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "context"
        "fmt"
        "math"
 
@@ -101,14 +102,14 @@ func (g *groupBy) Schema() logical.Schema {
        return g.schema.ProjTags(g.groupByTagsRefs...)
 }
 
-func (g *groupBy) Execute(ec executor.MeasureExecutionContext) 
(executor.MIterator, error) {
+func (g *groupBy) Execute(ec context.Context) (executor.MIterator, error) {
        if g.groupByEntity {
                return g.sort(ec)
        }
        return g.hash(ec)
 }
 
-func (g *groupBy) sort(ec executor.MeasureExecutionContext) 
(executor.MIterator, error) {
+func (g *groupBy) sort(ec context.Context) (executor.MIterator, error) {
        iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
        if err != nil {
                return nil, err
@@ -116,7 +117,7 @@ func (g *groupBy) sort(ec executor.MeasureExecutionContext) 
(executor.MIterator,
        return newGroupSortIterator(iter, g.groupByTagsRefs), nil
 }
 
-func (g *groupBy) hash(ec executor.MeasureExecutionContext) (mit 
executor.MIterator, err error) {
+func (g *groupBy) hash(ec context.Context) (mit executor.MIterator, err error) 
{
        iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
        if err != nil {
                return nil, err
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 1b45fdfd..7d9a1fa8 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -31,6 +31,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -76,7 +77,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) 
(logical.Plan, error)
                // fill AnyEntry by default
                entity[idx] = tsdb.AnyEntry
        }
-       filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, 
entityMap, entity)
+       filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, 
entityMap, entity, true)
        if err != nil {
                return nil, err
        }
@@ -122,7 +123,7 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
        i.order = order
 }
 
-func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (mit 
executor.MIterator, err error) {
+func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, 
err error) {
        var orderBy *tsdb.OrderBy
        if i.order.Index != nil {
                orderBy = &tsdb.OrderBy{
@@ -130,6 +131,7 @@ func (i *localIndexScan) Execute(ec 
executor.MeasureExecutionContext) (mit execu
                        Sort:  i.order.Sort,
                }
        }
+       ec := executor.FromMeasureExecutionContext(ctx)
        var seriesList tsdb.SeriesList
        for _, e := range i.entities {
                shards, errInternal := ec.Shards(e)
@@ -139,7 +141,7 @@ func (i *localIndexScan) Execute(ec 
executor.MeasureExecutionContext) (mit execu
                for _, shard := range shards {
                        sl, errInternal := shard.Series().Search(
                                context.WithValue(
-                                       context.Background(),
+                                       ctx,
                                        logger.ContextKey,
                                        i.l,
                                ),
@@ -163,7 +165,7 @@ func (i *localIndexScan) Execute(ec 
executor.MeasureExecutionContext) (mit execu
                })
        }
        // CAVEAT: the order of series list matters when sorting by an index.
-       iters, closers, err := logical.ExecuteForShard(i.l, seriesList, 
i.timeRange, builders...)
+       iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, 
i.timeRange, builders...)
        if err != nil {
                return nil, err
        }
@@ -224,7 +226,7 @@ func indexScan(startTime, endTime time.Time, metadata 
*commonv1.Metadata, projec
 var _ executor.MIterator = (*indexScanIterator)(nil)
 
 type indexScanIterator struct {
-       inner   logical.ItemIterator
+       inner   sort.Iterator[tsdb.Item]
        err     error
        current *measurev1.DataPoint
        context transformContext
@@ -232,7 +234,7 @@ type indexScanIterator struct {
        num     int
 }
 
-func newIndexScanIterator(inner logical.ItemIterator, context 
transformContext, max int) executor.MIterator {
+func newIndexScanIterator(inner sort.Iterator[tsdb.Item], context 
transformContext, max int) executor.MIterator {
        return &indexScanIterator{
                inner:   inner,
                context: context,
@@ -241,10 +243,10 @@ func newIndexScanIterator(inner logical.ItemIterator, 
context transformContext,
 }
 
 func (ism *indexScanIterator) Next() bool {
-       if !ism.inner.HasNext() || ism.err != nil || ism.num > ism.max {
+       if !ism.inner.Next() || ism.err != nil || ism.num > ism.max {
                return false
        }
-       nextItem := ism.inner.Next()
+       nextItem := ism.inner.Val()
        var err error
        if ism.current, err = transform(nextItem, ism.context); err != nil {
                ism.err = multierr.Append(ism.err, err)
diff --git a/pkg/query/logical/measure/measure_plan_top.go 
b/pkg/query/logical/measure/measure_plan_top.go
index 194f8e48..25fc5062 100644
--- a/pkg/query/logical/measure/measure_plan_top.go
+++ b/pkg/query/logical/measure/measure_plan_top.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "context"
        "fmt"
 
        "github.com/pkg/errors"
@@ -90,7 +91,7 @@ func (g *topOp) Schema() logical.Schema {
        return g.Input.Schema()
 }
 
-func (g *topOp) Execute(ec executor.MeasureExecutionContext) (mit 
executor.MIterator, err error) {
+func (g *topOp) Execute(ec context.Context) (mit executor.MIterator, err 
error) {
        iter, err := g.Parent.Input.(executor.MeasureExecutable).Execute(ec)
        if err != nil {
                return nil, err
diff --git a/pkg/query/logical/stream/stream_analyzer.go 
b/pkg/query/logical/stream/stream_analyzer.go
index 4771eeab..919ac9ea 100644
--- a/pkg/query/logical/stream/stream_analyzer.go
+++ b/pkg/query/logical/stream/stream_analyzer.go
@@ -22,8 +22,8 @@ import (
        "fmt"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -31,12 +31,10 @@ import (
 const defaultLimit uint32 = 20
 
 // BuildSchema returns Schema loaded from the metadata repository.
-func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) {
-       sm := streamSchema.GetSchema()
-
+func BuildSchema(sm *databasev1.Stream, indexRules []*databasev1.IndexRule) 
(logical.Schema, error) {
        s := &schema{
                common: &logical.CommonSchema{
-                       IndexRules: streamSchema.GetIndexRules(),
+                       IndexRules: indexRules,
                        TagSpecMap: make(map[string]*logical.TagSpec),
                        EntityList: sm.GetEntity().GetTagNames(),
                },
@@ -77,6 +75,22 @@ func Analyze(_ context.Context, criteria 
*streamv1.QueryRequest, metadata *commo
        return p, nil
 }
 
+// DistributedAnalyze converts logical expressions to executable operation 
tree represented by Plan.
+func DistributedAnalyze(criteria *streamv1.QueryRequest, s logical.Schema) 
(logical.Plan, error) {
+       // parse fields
+       plan := newUnresolvedDistributed(criteria)
+       // parse offset
+       plan = newOffset(plan, criteria.GetOffset())
+
+       // parse limit
+       limitParameter := criteria.GetLimit()
+       if limitParameter == 0 {
+               limitParameter = defaultLimit
+       }
+       plan = newLimit(plan, limitParameter)
+       return plan.Analyze(s)
+}
+
 var (
        _ logical.Plan           = (*limit)(nil)
        _ logical.UnresolvedPlan = (*limit)(nil)
@@ -93,7 +107,7 @@ type limit struct {
        LimitNum uint32
 }
 
-func (l *limit) Execute(ec executor.StreamExecutionContext) 
([]*streamv1.Element, error) {
+func (l *limit) Execute(ec context.Context) ([]*streamv1.Element, error) {
        entities, err := l.Parent.Input.(executor.StreamExecutable).Execute(ec)
        if err != nil {
                return nil, err
@@ -146,7 +160,7 @@ type offset struct {
        offsetNum uint32
 }
 
-func (l *offset) Execute(ec executor.StreamExecutionContext) 
([]*streamv1.Element, error) {
+func (l *offset) Execute(ec context.Context) ([]*streamv1.Element, error) {
        elements, err := l.Parent.Input.(executor.StreamExecutable).Execute(ec)
        if err != nil {
                return nil, err
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
new file mode 100644
index 00000000..7fce94cc
--- /dev/null
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -0,0 +1,214 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "context"
+       "fmt"
+
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/proto"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
+
+type unresolvedDistributed struct {
+       originalQuery *streamv1.QueryRequest
+}
+
+func newUnresolvedDistributed(query *streamv1.QueryRequest) 
logical.UnresolvedPlan {
+       return &unresolvedDistributed{
+               originalQuery: query,
+       }
+}
+
+func (ud *unresolvedDistributed) Analyze(s logical.Schema) (logical.Plan, 
error) {
+       if ud.originalQuery.Projection == nil {
+               return nil, fmt.Errorf("projection is required")
+       }
+       limit := ud.originalQuery.GetLimit()
+       if limit == 0 {
+               limit = defaultLimit
+       }
+       temp := &streamv1.QueryRequest{
+               Projection: ud.originalQuery.Projection,
+               Metadata:   ud.originalQuery.Metadata,
+               Criteria:   ud.originalQuery.Criteria,
+               Limit:      limit,
+               OrderBy:    ud.originalQuery.OrderBy,
+       }
+       if ud.originalQuery.OrderBy == nil {
+               return &distributedPlan{
+                       queryTemplate: temp,
+                       s:             s,
+                       sortByTime:    true,
+               }, nil
+       }
+       ok, indexRule := 
s.IndexRuleDefined(ud.originalQuery.OrderBy.IndexRuleName)
+       if !ok {
+               return nil, fmt.Errorf("index rule %s not found", 
ud.originalQuery.OrderBy.IndexRuleName)
+       }
+       if len(indexRule.Tags) != 1 {
+               return nil, fmt.Errorf("index rule %s should have only one 
tag", ud.originalQuery.OrderBy.IndexRuleName)
+       }
+       sortTagSpec := s.FindTagSpecByName(indexRule.Tags[0])
+       if sortTagSpec == nil {
+               return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0])
+       }
+       result := &distributedPlan{
+               queryTemplate: temp,
+               s:             s,
+               sortByTime:    false,
+               sortTagSpec:   *sortTagSpec,
+       }
+       if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
+               result.desc = true
+       }
+       return result, nil
+}
+
+type distributedPlan struct {
+       s             logical.Schema
+       queryTemplate *streamv1.QueryRequest
+       sortTagSpec   logical.TagSpec
+       sortByTime    bool
+       desc          bool
+}
+
+func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
+       dctx := executor.FromDistributedExecutionContext(ctx)
+       query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
+       query.TimeRange = dctx.TimeRange()
+       ff, err := dctx.Broadcast(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       if err != nil {
+               return nil, err
+       }
+       var allErr error
+       var see []sort.Iterator[*comparableElement]
+       for _, f := range ff {
+               if m, getErr := f.Get(); getErr != nil {
+                       allErr = multierr.Append(allErr, getErr)
+               } else {
+                       see = append(see,
+                               
newSortableElements(m.Data().(*streamv1.QueryResponse).Elements,
+                                       t.sortByTime, t.sortTagSpec))
+               }
+       }
+       iter := sort.NewItemIter[*comparableElement](see, t.desc)
+       var result []*streamv1.Element
+       for iter.Next() {
+               result = append(result, iter.Val().Element)
+       }
+       return result, nil
+}
+
+func (t *distributedPlan) String() string {
+       return fmt.Sprintf("distributed:%s", t.queryTemplate.String())
+}
+
+func (t *distributedPlan) Children() []logical.Plan {
+       return []logical.Plan{}
+}
+
+func (t *distributedPlan) Schema() logical.Schema {
+       return t.s
+}
+
+var _ sort.Comparable = (*comparableElement)(nil)
+
+type comparableElement struct {
+       *streamv1.Element
+       sortField []byte
+}
+
+func newComparableElement(e *streamv1.Element, sortByTime bool, sortTagSpec 
logical.TagSpec) (*comparableElement, error) {
+       var sortField []byte
+       if sortByTime {
+               sortField = 
convert.Uint64ToBytes(uint64(e.Timestamp.AsTime().UnixNano()))
+       } else {
+               var err error
+               sortField, err = 
pbv1.MarshalTagValue(e.TagFamilies[sortTagSpec.TagFamilyIdx].Tags[sortTagSpec.TagIdx].Value)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       return &comparableElement{
+               Element:   e,
+               sortField: sortField,
+       }, nil
+}
+
+func (e *comparableElement) SortedField() []byte {
+       return e.sortField
+}
+
+var _ sort.Iterator[*comparableElement] = (*sortableElements)(nil)
+
+type sortableElements struct {
+       cur          *comparableElement
+       elements     []*streamv1.Element
+       sortTagSpec  logical.TagSpec
+       index        int
+       isSortByTime bool
+}
+
+func newSortableElements(elements []*streamv1.Element, isSortByTime bool, 
sortTagSpec logical.TagSpec) *sortableElements {
+       return &sortableElements{
+               elements:     elements,
+               isSortByTime: isSortByTime,
+               sortTagSpec:  sortTagSpec,
+       }
+}
+
+func (*sortableElements) Close() error {
+       return nil
+}
+
+func (s *sortableElements) Next() bool {
+       return s.iter(func(e *streamv1.Element) (*comparableElement, error) {
+               return newComparableElement(e, s.isSortByTime, s.sortTagSpec)
+       })
+}
+
+func (s *sortableElements) Val() *comparableElement {
+       return s.cur
+}
+
+func (s *sortableElements) iter(fn func(*streamv1.Element) 
(*comparableElement, error)) bool {
+       if s.index >= len(s.elements) {
+               return false
+       }
+       cur, err := fn(s.elements[s.index])
+       s.index++
+       if err != nil {
+               return s.iter(fn)
+       }
+       s.cur = cur
+       return s.index < len(s.elements)
+}
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_global.go 
b/pkg/query/logical/stream/stream_plan_indexscan_global.go
index 4ec4fc39..ff1abb0c 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_global.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_global.go
@@ -59,14 +59,15 @@ func (t *globalIndexScan) Schema() logical.Schema {
        return t.schema
 }
 
-func (t *globalIndexScan) Execute(ec executor.StreamExecutionContext) 
([]*streamv1.Element, error) {
+func (t *globalIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
+       ec := executor.FromStreamExecutionContext(ctx)
        shards, err := ec.Shards(nil)
        if err != nil {
                return nil, err
        }
        var elements []*streamv1.Element
        for _, shard := range shards {
-               elementsInShard, shardErr := t.executeForShard(ec, shard)
+               elementsInShard, shardErr := t.executeForShard(ctx, ec, shard)
                if shardErr != nil {
                        return elements, shardErr
                }
@@ -75,7 +76,7 @@ func (t *globalIndexScan) Execute(ec 
executor.StreamExecutionContext) ([]*stream
        return elements, nil
 }
 
-func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, 
shard tsdb.Shard) ([]*streamv1.Element, error) {
+func (t *globalIndexScan) executeForShard(ctx context.Context, ec 
executor.StreamExecutionContext, shard tsdb.Shard) ([]*streamv1.Element, error) 
{
        var elementsInShard []*streamv1.Element
        for _, term := range t.expr.Bytes() {
                itemIDs, err := shard.Index().Seek(index.Field{
@@ -98,9 +99,9 @@ func (t *globalIndexScan) executeForShard(ec 
executor.StreamExecutionContext, sh
                                return elementsInShard, errors.WithStack(err)
                        }
                        err = func() error {
-                               ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                               ctxSeries, cancel := context.WithTimeout(ctx, 
5*time.Second)
                                defer cancel()
-                               item, closer, errInner := series.Get(ctx, 
itemID)
+                               item, closer, errInner := series.Get(ctxSeries, 
itemID)
                                defer func(closer io.Closer) {
                                        if closer != nil {
                                                _ = closer.Close()
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 5970e0f8..c011332d 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -62,8 +62,9 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
        i.order = order
 }
 
-func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) (elements 
[]*streamv1.Element, err error) {
+func (i *localIndexScan) Execute(ctx context.Context) (elements 
[]*streamv1.Element, err error) {
        var seriesList tsdb.SeriesList
+       ec := executor.FromStreamExecutionContext(ctx)
        for _, e := range i.entities {
                shards, errInternal := ec.Shards(e)
                if errInternal != nil {
@@ -71,7 +72,7 @@ func (i *localIndexScan) Execute(ec 
executor.StreamExecutionContext) (elements [
                }
                for _, shard := range shards {
                        sl, errInternal := 
shard.Series().List(context.WithValue(
-                               context.Background(),
+                               ctx,
                                logger.ContextKey,
                                i.l,
                        ), tsdb.NewPath(e))
@@ -99,7 +100,7 @@ func (i *localIndexScan) Execute(ec 
executor.StreamExecutionContext) (elements [
                        b.Filter(i.filter)
                })
        }
-       iters, closers, err := logical.ExecuteForShard(i.l, seriesList, 
i.timeRange, builders...)
+       iters, closers, err := logical.ExecuteForShard(ctx, i.l, seriesList, 
i.timeRange, builders...)
        if err != nil {
                return nil, err
        }
@@ -121,8 +122,8 @@ func (i *localIndexScan) Execute(ec 
executor.StreamExecutionContext) (elements [
        defer func() {
                err = multierr.Append(err, it.Close())
        }()
-       for it.HasNext() {
-               nextItem := it.Next()
+       for it.Next() {
+               nextItem := it.Val()
                tagFamilies, innerErr := logical.ProjectItem(ec, nextItem, 
i.projectionTagRefs)
                if innerErr != nil {
                        return nil, innerErr
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go 
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index 4ea0b3bf..f6e2f3f0 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "context"
        "errors"
        "fmt"
        "time"
@@ -58,7 +59,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) 
(logical.Plan, error)
                entity[idx] = tsdb.AnyEntry
        }
        var err error
-       ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, 
s, entityDict, entity)
+       ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, 
s, entityDict, entity, false)
        if err != nil {
                var ge logical.GlobalIndexError
                if errors.As(err, &ge) {
@@ -161,7 +162,7 @@ func newTagFilter(s logical.Schema, parent logical.Plan, 
tagFilter logical.TagFi
        }
 }
 
-func (t *tagFilterPlan) Execute(ec executor.StreamExecutionContext) 
([]*streamv1.Element, error) {
+func (t *tagFilterPlan) Execute(ec context.Context) ([]*streamv1.Element, 
error) {
        entities, err := t.parent.(executor.StreamExecutable).Execute(ec)
        if err != nil {
                return nil, err
diff --git a/pkg/run/channel_closer.go b/pkg/run/channel_closer.go
index 54c62707..fa1af90d 100644
--- a/pkg/run/channel_closer.go
+++ b/pkg/run/channel_closer.go
@@ -101,6 +101,7 @@ func (c *ChannelCloser) CloseThenWait() {
                return
        }
 
+       c.cancel()
        c.lock.Lock()
        c.closed = true
        c.lock.Unlock()
@@ -108,7 +109,6 @@ func (c *ChannelCloser) CloseThenWait() {
        c.sender.Done()
        c.sender.Wait()
 
-       c.cancel()
        c.receiver.Done()
        c.receiver.Wait()
 }
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 0b4cc94a..ce96cf39 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -15,13 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package schema implements a framework to sync schema info from the metadata 
repository.
 package schema
 
 import (
        "context"
        "io"
-       "math"
        "sync"
        "sync/atomic"
        "time"
@@ -38,78 +36,56 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-// EventType defines actions of events.
-type EventType uint8
+var _ Resource = (*resourceSpec)(nil)
 
-// EventType support Add/Update and Delete.
-// All events are idempotent.
-const (
-       EventAddOrUpdate EventType = iota
-       EventDelete
-)
-
-// EventKind defines category of events.
-type EventKind uint8
-
-// This framework groups events to a hierarchy. A group is the root node.
-const (
-       EventKindGroup EventKind = iota
-       EventKindResource
-)
+type resourceSpec struct {
+       schema       ResourceSchema
+       delegated    io.Closer
+       indexRules   []*databasev1.IndexRule
+       aggregations []*databasev1.TopNAggregation
+}
 
-// Group is the root node, allowing get resources from its sub nodes.
-type Group interface {
-       GetSchema() *commonv1.Group
-       StoreResource(ctx context.Context, resourceSchema ResourceSchema) 
(Resource, error)
-       LoadResource(name string) (Resource, bool)
+func (rs *resourceSpec) Delegated() io.Closer {
+       return rs.delegated
 }
 
-// MetadataEvent is the syncing message between metadata repo and this 
framework.
-type MetadataEvent struct {
-       Metadata *commonv1.Metadata
-       Typ      EventType
-       Kind     EventKind
+func (rs *resourceSpec) Close() error {
+       return rs.delegated.Close()
 }
 
-// ResourceSchema allows get the metadata.
-type ResourceSchema interface {
-       GetMetadata() *commonv1.Metadata
+func (rs *resourceSpec) Schema() ResourceSchema {
+       return rs.schema
 }
 
-// ResourceSpec wraps required fields to open a resource.
-type ResourceSpec struct {
-       Schema ResourceSchema
-       // IndexRules are index rules bound to the Schema
-       IndexRules []*databasev1.IndexRule
-       // Aggregations are topN aggregation bound to the Schema, e.g. 
TopNAggregation
-       Aggregations []*databasev1.TopNAggregation
+func (rs *resourceSpec) IndexRules() []*databasev1.IndexRule {
+       return rs.indexRules
 }
 
-// Resource allows access metadata from a local cache.
-type Resource interface {
-       GetIndexRules() []*databasev1.IndexRule
-       GetTopN() []*databasev1.TopNAggregation
-       MaxObservedModRevision() int64
-       ResourceSchema
-       io.Closer
+func (rs *resourceSpec) TopN() []*databasev1.TopNAggregation {
+       return rs.aggregations
 }
 
-// ResourceSupplier allows open a resource and its embedded tsdb.
-type ResourceSupplier interface {
-       OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) 
(Resource, error)
-       ResourceSchema(metdata *commonv1.Metadata) (ResourceSchema, error)
-       OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
+func (rs *resourceSpec) maxRevision() int64 {
+       return rs.schema.GetMetadata().GetModRevision()
 }
 
-// Repository is the collection of several hierarchies groups by a "Group".
-type Repository interface {
-       Watcher()
-       SendMetadataEvent(MetadataEvent)
-       StoreGroup(groupMeta *commonv1.Metadata) (*group, error)
-       LoadGroup(name string) (Group, bool)
-       LoadResource(metadata *commonv1.Metadata) (Resource, bool)
-       Close()
-       StopCh() <-chan struct{}
+func (rs *resourceSpec) isNewThan(other *resourceSpec) bool {
+       if other.maxRevision() > rs.maxRevision() {
+               return false
+       }
+       if len(rs.indexRules) != len(other.indexRules) {
+               return false
+       }
+       if len(rs.aggregations) != len(other.aggregations) {
+               return false
+       }
+       if parseMaxModRevision(other.indexRules) > 
parseMaxModRevision(rs.indexRules) {
+               return false
+       }
+       if parseMaxModRevision(other.aggregations) > 
parseMaxModRevision(rs.aggregations) {
+               return false
+       }
+       return true
 }
 
 const defaultWorkerNum = 10
@@ -117,17 +93,28 @@ const defaultWorkerNum = 10
 var _ Repository = (*schemaRepo)(nil)
 
 type schemaRepo struct {
-       metadata         metadata.Repo
-       resourceSupplier ResourceSupplier
-       l                *logger.Logger
-       data             map[string]*group
-       workerCloser     *run.Closer
-       closer           *run.Closer
-       eventCh          chan MetadataEvent
-       workerNum        int
+       metadata               metadata.Repo
+       resourceSupplier       ResourceSupplier
+       resourceSchemaSupplier ResourceSchemaSupplier
+       l                      *logger.Logger
+       data                   map[string]*group
+       closer                 *run.ChannelCloser
+       eventCh                chan MetadataEvent
+       workerNum              int
        sync.RWMutex
 }
 
+func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) {
+       if !sr.closer.AddSender() {
+               return
+       }
+       defer sr.closer.SenderDone()
+       select {
+       case sr.eventCh <- event:
+       case <-sr.closer.CloseNotify():
+       }
+}
+
 // StopCh implements Repository.
 func (sr *schemaRepo) StopCh() <-chan struct{} {
        return sr.closer.CloseNotify()
@@ -140,26 +127,42 @@ func NewRepository(
        resourceSupplier ResourceSupplier,
 ) Repository {
        return &schemaRepo{
-               metadata:         metadata,
-               l:                l,
-               resourceSupplier: resourceSupplier,
-               data:             make(map[string]*group),
-               eventCh:          make(chan MetadataEvent, defaultWorkerNum),
-               workerNum:        defaultWorkerNum,
-               workerCloser:     run.NewCloser(defaultWorkerNum),
-               closer:           run.NewCloser(1),
+               metadata:               metadata,
+               l:                      l,
+               resourceSupplier:       resourceSupplier,
+               resourceSchemaSupplier: resourceSupplier,
+               data:                   make(map[string]*group),
+               eventCh:                make(chan MetadataEvent, 
defaultWorkerNum),
+               workerNum:              defaultWorkerNum,
+               closer:                 run.NewChannelCloser(),
        }
 }
 
-func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) {
-       sr.eventCh <- event
+// NewPortableRepository return a new Repository without tsdb.
+func NewPortableRepository(
+       metadata metadata.Repo,
+       l *logger.Logger,
+       supplier ResourceSchemaSupplier,
+) Repository {
+       return &schemaRepo{
+               metadata:               metadata,
+               l:                      l,
+               resourceSchemaSupplier: supplier,
+               data:                   make(map[string]*group),
+               eventCh:                make(chan MetadataEvent, 
defaultWorkerNum),
+               workerNum:              defaultWorkerNum,
+               closer:                 run.NewChannelCloser(),
+       }
 }
 
 func (sr *schemaRepo) Watcher() {
        for i := 0; i < sr.workerNum; i++ {
                go func() {
+                       if !sr.closer.AddReceiver() {
+                               return
+                       }
                        defer func() {
-                               sr.workerCloser.Done()
+                               sr.closer.ReceiverDone()
                                if err := recover(); err != nil {
                                        sr.l.Warn().Interface("err", 
err).Msg("watching the events")
                                }
@@ -180,7 +183,7 @@ func (sr *schemaRepo) Watcher() {
                                                case EventKindGroup:
                                                        _, err = 
sr.StoreGroup(evt.Metadata)
                                                case EventKindResource:
-                                                       _, err = 
sr.storeResource(evt.Metadata)
+                                                       err = 
sr.storeResource(evt.Metadata)
                                                }
                                        case EventDelete:
                                                switch evt.Kind {
@@ -191,14 +194,15 @@ func (sr *schemaRepo) Watcher() {
                                                }
                                        }
                                        if err != nil && !errors.Is(err, 
schema.ErrClosed) {
-                                               
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
                                                select {
-                                               case sr.eventCh <- evt:
-                                               case 
<-sr.workerCloser.CloseNotify():
+                                               case <-sr.closer.CloseNotify():
                                                        return
+                                               default:
                                                }
+                                               
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
+                                               sr.SendMetadataEvent(evt)
                                        }
-                               case <-sr.workerCloser.CloseNotify():
+                               case <-sr.closer.CloseNotify():
                                        return
                                }
                        }
@@ -207,27 +211,30 @@ func (sr *schemaRepo) Watcher() {
 }
 
 func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) 
{
-       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-       defer cancel()
-       groupSchema, err := sr.metadata.GroupRegistry().GetGroup(ctx, 
groupMeta.GetName())
-       if err != nil {
-               return nil, err
-       }
-       name := groupSchema.GetMetadata().GetName()
+       name := groupMeta.GetName()
        sr.Lock()
        defer sr.Unlock()
        g, ok := sr.getGroup(name)
        if !ok {
                sr.l.Info().Str("group", name).Msg("creating a tsdb")
-               var db tsdb.Database
-               db, err = sr.resourceSupplier.OpenDB(groupSchema)
-               if err != nil {
+               g = sr.createGroup(name)
+               if err := g.init(name); err != nil {
                        return nil, err
                }
-               g = newGroup(groupSchema, sr.metadata, db, sr.l, 
sr.resourceSupplier)
-               sr.data[name] = g
                return g, nil
        }
+       if !g.isInit() {
+               if err := g.init(name); err != nil {
+                       return nil, err
+               }
+               return g, nil
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       groupSchema, err := g.metadata.GroupRegistry().GetGroup(ctx, name)
+       if err != nil {
+               return nil, err
+       }
        prevGroupSchema := g.GetSchema()
        if groupSchema.GetMetadata().GetModRevision() <= 
prevGroupSchema.Metadata.ModRevision {
                return g, nil
@@ -236,15 +243,22 @@ func (sr *schemaRepo) StoreGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
        db := g.SupplyTSDB()
        db.Close()
        sr.l.Info().Str("group", name).Msg("creating a new tsdb")
-       newDB, err := sr.resourceSupplier.OpenDB(groupSchema)
-       if err != nil {
+       if err := g.init(name); err != nil {
                return nil, err
        }
-       g.setDB(newDB)
-       g.groupSchema.Store(groupSchema)
        return g, nil
 }
 
+func (sr *schemaRepo) createGroup(name string) (g *group) {
+       if sr.resourceSupplier != nil {
+               g = newGroup(sr.metadata, sr.l, sr.resourceSupplier)
+       } else {
+               g = newPortableGroup(sr.metadata, sr.l)
+       }
+       sr.data[name] = g
+       return
+}
+
 func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
        name := groupMeta.GetName()
        sr.Lock()
@@ -273,7 +287,11 @@ func (sr *schemaRepo) getGroup(name string) (*group, bool) 
{
 func (sr *schemaRepo) LoadGroup(name string) (Group, bool) {
        sr.RLock()
        defer sr.RUnlock()
-       return sr.getGroup(name)
+       g, ok := sr.getGroup(name)
+       if !ok {
+               return nil, false
+       }
+       return g, g.isInit()
 }
 
 func (sr *schemaRepo) LoadResource(metadata *commonv1.Metadata) (Resource, 
bool) {
@@ -284,19 +302,26 @@ func (sr *schemaRepo) LoadResource(metadata 
*commonv1.Metadata) (Resource, bool)
        return g.LoadResource(metadata.Name)
 }
 
-func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, 
error) {
-       group, ok := sr.LoadGroup(metadata.Group)
+func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) error {
+       g, ok := sr.LoadGroup(metadata.Group)
        if !ok {
                var err error
-               if group, err = sr.StoreGroup(&commonv1.Metadata{Name: 
metadata.Group}); err != nil {
-                       return nil, errors.WithMessagef(err, "create unknown 
group:%s", metadata.Group)
+               if g, err = sr.StoreGroup(&commonv1.Metadata{Name: 
metadata.Group}); err != nil {
+                       return errors.WithMessagef(err, "create unknown 
group:%s", metadata.Group)
                }
        }
-       stm, err := sr.resourceSupplier.ResourceSchema(metadata)
+       stm, err := sr.resourceSchemaSupplier.ResourceSchema(metadata)
        if err != nil {
-               return nil, errors.WithMessage(err, "fails to get the resource")
+               if errors.Is(err, schema.ErrGRPCResourceNotFound) {
+                       if dl := sr.l.Debug(); dl.Enabled() {
+                               dl.Interface("metadata", 
metadata).Msg("resource not found")
+                       }
+                       return nil
+               }
+               return errors.WithMessage(err, "fails to get the resource")
        }
-       return group.StoreResource(context.Background(), stm)
+       _, err = g.(*group).storeResource(context.Background(), stm)
+       return err
 }
 
 func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error {
@@ -313,7 +338,8 @@ func (sr *schemaRepo) Close() {
                        sr.l.Warn().Interface("err", err).Msg("closing 
resource")
                }
        }()
-       sr.workerCloser.CloseThenWait()
+       sr.closer.CloseThenWait()
+       close(sr.eventCh)
 
        sr.RLock()
        defer sr.RUnlock()
@@ -323,8 +349,6 @@ func (sr *schemaRepo) Close() {
                        sr.l.Err(err).RawJSON("group", 
logger.Proto(g.GetSchema().Metadata)).Msg("closing")
                }
        }
-       sr.closer.Done()
-       sr.closer.CloseThenWait()
 }
 
 var _ Group = (*group)(nil)
@@ -335,14 +359,12 @@ type group struct {
        db               atomic.Value
        groupSchema      atomic.Pointer[commonv1.Group]
        l                *logger.Logger
-       schemaMap        map[string]Resource
+       schemaMap        map[string]*resourceSpec
        mapMutex         sync.RWMutex
 }
 
 func newGroup(
-       groupSchema *commonv1.Group,
        metadata metadata.Repo,
-       db tsdb.Database,
        l *logger.Logger,
        resourceSupplier ResourceSupplier,
 ) *group {
@@ -350,12 +372,49 @@ func newGroup(
                groupSchema:      atomic.Pointer[commonv1.Group]{},
                metadata:         metadata,
                l:                l,
-               schemaMap:        make(map[string]Resource),
+               schemaMap:        make(map[string]*resourceSpec),
                resourceSupplier: resourceSupplier,
        }
+       return g
+}
+
+func newPortableGroup(
+       metadata metadata.Repo,
+       l *logger.Logger,
+) *group {
+       g := &group{
+               groupSchema: atomic.Pointer[commonv1.Group]{},
+               metadata:    metadata,
+               l:           l,
+               schemaMap:   make(map[string]*resourceSpec),
+       }
+       return g
+}
+
+func (g *group) init(name string) error {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       groupSchema, err := g.metadata.GroupRegistry().GetGroup(ctx, name)
+       if errors.As(err, schema.ErrGRPCResourceNotFound) {
+               return nil
+       }
+       if err != nil {
+               return err
+       }
        g.groupSchema.Store(groupSchema)
+       if g.isPortable() {
+               return nil
+       }
+       db, err := g.resourceSupplier.OpenDB(groupSchema)
+       if err != nil {
+               return err
+       }
        g.db.Store(db)
-       return g
+       return nil
+}
+
+func (g *group) isInit() bool {
+       return g.GetSchema() != nil
 }
 
 func (g *group) GetSchema() *commonv1.Group {
@@ -366,46 +425,15 @@ func (g *group) SupplyTSDB() tsdb.Database {
        return g.db.Load().(tsdb.Database)
 }
 
-func (g *group) setDB(db tsdb.Database) {
-       g.db.Store(db)
-}
-
-func (g *group) StoreResource(ctx context.Context, resourceSchema 
ResourceSchema) (Resource, error) {
+func (g *group) storeResource(ctx context.Context, resourceSchema 
ResourceSchema) (Resource, error) {
        g.mapMutex.Lock()
        defer g.mapMutex.Unlock()
-       key := resourceSchema.GetMetadata().GetName()
-       preResource := g.schemaMap[key]
-       var localCtx context.Context
-       var cancel context.CancelFunc
-       if preResource != nil &&
-               resourceSchema.GetMetadata().GetModRevision() <= 
preResource.GetMetadata().GetModRevision() {
-               // we only need to check the max modifications revision 
observed for index rules
-               localCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
-               idxRules, errIndexRules := g.metadata.IndexRules(localCtx, 
resourceSchema.GetMetadata())
-               cancel()
-               if errIndexRules != nil {
-                       return nil, errIndexRules
-               }
-               localCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
-               topNAggrs, errTopN := 
g.metadata.MeasureRegistry().TopNAggregations(localCtx, 
resourceSchema.GetMetadata())
-               cancel()
-               if errTopN != nil {
-                       return nil, errTopN
-               }
-               if len(idxRules) == len(preResource.GetIndexRules()) && 
len(topNAggrs) == len(preResource.GetTopN()) {
-                       maxModRevision := 
int64(math.Max(float64(ParseMaxModRevision(idxRules)), 
float64(ParseMaxModRevision(topNAggrs))))
-                       if preResource.MaxObservedModRevision() >= 
maxModRevision {
-                               return preResource, nil
-                       }
-               }
-       }
-       localCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
+       localCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
        defer cancel()
        idxRules, err := g.metadata.IndexRules(localCtx, 
resourceSchema.GetMetadata())
        if err != nil {
                return nil, err
        }
-
        var topNAggrs []*databasev1.TopNAggregation
        if _, ok := resourceSchema.(*databasev1.Measure); ok {
                localCtx, cancel = context.WithTimeout(ctx, 5*time.Second)
@@ -416,20 +444,28 @@ func (g *group) StoreResource(ctx context.Context, 
resourceSchema ResourceSchema
                        return nil, innerErr
                }
        }
-
-       sm, errTS := 
g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, 
ResourceSpec{
-               Schema:       resourceSchema,
-               IndexRules:   idxRules,
-               Aggregations: topNAggrs,
-       })
-       if errTS != nil {
-               return nil, errTS
+       resource := &resourceSpec{
+               schema:       resourceSchema,
+               indexRules:   idxRules,
+               aggregations: topNAggrs,
+       }
+       key := resourceSchema.GetMetadata().GetName()
+       preResource := g.schemaMap[key]
+       if preResource != nil && preResource.isNewThan(resource) {
+               return preResource, nil
+       }
+       if !g.isPortable() {
+               sm, errTS := 
g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, 
resource)
+               if errTS != nil {
+                       return nil, errTS
+               }
+               resource.delegated = sm
        }
-       g.schemaMap[key] = sm
+       g.schemaMap[key] = resource
        if preResource != nil {
                _ = preResource.Close()
        }
-       return sm, nil
+       return resource, nil
 }
 
 func (g *group) deleteResource(metadata *commonv1.Metadata) error {
@@ -445,6 +481,10 @@ func (g *group) deleteResource(metadata 
*commonv1.Metadata) error {
        return nil
 }
 
+func (g *group) isPortable() bool {
+       return g.resourceSupplier == nil
+}
+
 func (g *group) LoadResource(name string) (Resource, bool) {
        g.mapMutex.RLock()
        s := g.schemaMap[name]
@@ -461,11 +501,13 @@ func (g *group) close() (err error) {
                err = multierr.Append(err, s.Close())
        }
        g.mapMutex.RUnlock()
+       if !g.isInit() {
+               return nil
+       }
        return multierr.Append(err, g.SupplyTSDB().Close())
 }
 
-// ParseMaxModRevision gives the max revision from resources' metadata.
-func ParseMaxModRevision[T ResourceSchema](indexRules []T) 
(maxRevisionForIdxRules int64) {
+func parseMaxModRevision[T ResourceSchema](indexRules []T) 
(maxRevisionForIdxRules int64) {
        maxRevisionForIdxRules = int64(0)
        for _, idxRule := range indexRules {
                if idxRule.GetMetadata().GetModRevision() > 
maxRevisionForIdxRules {
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
new file mode 100644
index 00000000..f19a4230
--- /dev/null
+++ b/pkg/schema/schema.go
@@ -0,0 +1,96 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package schema implements a framework to sync schema info from the metadata 
repository.
+package schema
+
+import (
+       "io"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+)
+
+// EventType defines actions of events.
+type EventType uint8
+
+// EventType support Add/Update and Delete.
+// All events are idempotent.
+const (
+       EventAddOrUpdate EventType = iota
+       EventDelete
+)
+
+// EventKind defines category of events.
+type EventKind uint8
+
+// This framework groups events to a hierarchy. A group is the root node.
+const (
+       EventKindGroup EventKind = iota
+       EventKindResource
+)
+
+// Group is the root node, allowing get resources from its sub nodes.
+type Group interface {
+       GetSchema() *commonv1.Group
+       LoadResource(name string) (Resource, bool)
+}
+
+// MetadataEvent is the syncing message between metadata repo and this 
framework.
+type MetadataEvent struct {
+       Metadata *commonv1.Metadata
+       Typ      EventType
+       Kind     EventKind
+}
+
+// ResourceSchema allows get the metadata.
+type ResourceSchema interface {
+       GetMetadata() *commonv1.Metadata
+}
+
+// Resource allows access metadata from a local cache.
+type Resource interface {
+       io.Closer
+       IndexRules() []*databasev1.IndexRule
+       TopN() []*databasev1.TopNAggregation
+       Schema() ResourceSchema
+       Delegated() io.Closer
+}
+
+// ResourceSchemaSupplier allows get a ResourceSchema from the metadata.
+type ResourceSchemaSupplier interface {
+       ResourceSchema(metadata *commonv1.Metadata) (ResourceSchema, error)
+}
+
+// ResourceSupplier allows open a resource and its embedded tsdb.
+type ResourceSupplier interface {
+       ResourceSchemaSupplier
+       OpenResource(shardNum uint32, db tsdb.Supplier, spec Resource) 
(io.Closer, error)
+       OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
+}
+
+// Repository is the collection of several hierarchies groups by a "Group".
+type Repository interface {
+       Watcher()
+       SendMetadataEvent(MetadataEvent)
+       StoreGroup(groupMeta *commonv1.Metadata) (*group, error)
+       LoadGroup(name string) (Group, bool)
+       LoadResource(metadata *commonv1.Metadata) (Resource, bool)
+       Close()
+       StopCh() <-chan struct{}
+}


Reply via email to