This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new f354d0a9 Add Tracker To Object Pool (#508)
f354d0a9 is described below

commit f354d0a9e98b435947feda713eda0b1d9a8b2772
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Aug 6 15:31:29 2024 +0800

    Add Tracker To Object Pool (#508)
    
    * Fix duplicated measure data in a single part
    
    * Add the tracked pool to fix leak issues
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
    
    ---------
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                         |   7 +
 banyand/internal/storage/index_test.go             |  10 +-
 banyand/liaison/grpc/node.go                       |  10 +
 banyand/liaison/grpc/server.go                     |   1 -
 banyand/liaison/http/server.go                     |   2 -
 banyand/measure/block.go                           |  14 +-
 banyand/measure/block_metadata.go                  |  11 +-
 banyand/measure/block_reader.go                    |  10 +-
 banyand/measure/block_reader_test.go               |  15 +-
 banyand/measure/block_writer.go                    |   6 +-
 banyand/measure/column.go                          |   2 +-
 banyand/measure/column_metadata.go                 |   6 +-
 banyand/measure/datapoints.go                      |  18 +-
 banyand/measure/introducer.go                      |  15 +-
 banyand/measure/part.go                            |  27 +-
 banyand/measure/part_iter.go                       |  10 +-
 banyand/measure/query.go                           |   5 +-
 banyand/measure/tstable.go                         |  16 +
 banyand/measure/tstable_test.go                    |  62 ++++
 banyand/observability/meter_prom.go                |  12 +-
 banyand/observability/metrics_system.go            |  16 +-
 banyand/observability/service.go                   |  16 +-
 banyand/observability/system.go                    | 162 ---------
 banyand/queue/sub/server.go                        |   1 -
 banyand/stream/block.go                            |  14 +-
 banyand/stream/block_metadata.go                   |  11 +-
 banyand/stream/block_reader.go                     |  10 +-
 banyand/stream/block_writer.go                     |   6 +-
 banyand/stream/introducer.go                       |  15 +-
 banyand/stream/part.go                             |   6 +-
 banyand/stream/part_iter.go                        |  10 +-
 banyand/stream/query.go                            |   7 +-
 banyand/stream/tag.go                              |   2 +-
 banyand/stream/tag_metadata.go                     |   6 +-
 banyand/stream/tstable.go                          |  16 +
 pkg/bytes/buffer.go                                |  13 +-
 pkg/encoding/bytes.go                              |   2 +-
 pkg/encoding/encoder.go                            | 292 ---------------
 pkg/encoding/encoder_test.go                       | 392 ---------------------
 pkg/encoding/int.go                                |  11 +-
 pkg/fs/local_file_system.go                        |  10 +-
 pkg/index/inverted/inverted.go                     |   6 +-
 pkg/meter/native/collection.go                     |   2 +
 pkg/node/interface.go                              |  13 +
 pkg/node/maglev.go                                 |  13 +
 pkg/node/round_robin.go                            |  69 +++-
 pkg/node/round_robin_test.go                       |  22 ++
 pkg/pb/v1/series.go                                |  21 --
 pkg/pool/pool.go                                   |  81 +++++
 pkg/test/gmatcher/gmatcher.go                      |  59 ++++
 pkg/test/measure/testdata/groups/exception.json    |  18 +
 pkg/test/measure/testdata/measures/duplicated.json |  42 +++
 test/cases/init.go                                 |   1 +
 test/cases/measure/data/input/duplicated_part.yaml |  25 ++
 test/cases/measure/data/testdata/duplicated.json   | 182 ++++++++++
 test/cases/measure/data/want/duplicated_part.yaml  |  38 ++
 test/cases/measure/measure.go                      |   1 +
 .../distributed/query/query_suite_test.go          |   3 +
 test/integration/etcd/client_test.go               |   3 +
 test/integration/load/load_suite_test.go           |   3 +
 .../standalone/cold_query/query_suite_test.go      |   3 +
 test/integration/standalone/other/measure_test.go  |   3 +
 test/integration/standalone/other/property_test.go |   3 +
 .../standalone/query/query_suite_test.go           |   3 +
 .../query_ondisk/query_ondisk_suite_test.go        |   3 +
 65 files changed, 879 insertions(+), 1015 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ba07da6f..7389aaae 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -35,12 +35,19 @@ Release Notes.
 - Fix a bug that the Stream module didn't support duplicated in index-based 
filtering and sorting
 - Fix the bug that segment's reference count is increased twice when the 
controller try to create an existing segment.
 - Fix a bug where a distributed query would return an empty result if the 
"limit" was set much lower than the "offset".
+- Fix duplicated measure data in a single part.
+- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
 
 ### Documentation
+
 - Introduce new doc menu structure.
 - Add installation on Docker and Kubernetes.
 - Add quick-start guide.
 
+### Chores
+
+Bump up the version of infra e2e framework.
+
 ## 0.6.1
 
 ### Features
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 0e61b623..d73886a3 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -33,8 +33,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-var testSeriesPool pbv1.SeriesPool
-
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
        path, fn := setUp(require.New(t))
@@ -46,7 +44,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
        }()
        var docs index.Documents
        for i := 0; i < 100; i++ {
-               series := testSeriesPool.Generate()
+               var series pbv1.Series
                series.Subject = "service_instance_latency"
                series.EntityValues = []*modelv1.TagValue{
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
fmt.Sprintf("svc_%d", i)}}},
@@ -64,7 +62,6 @@ func TestSeriesIndex_Primary(t *testing.T) {
                }
                copy(doc.EntityValues, series.Buffer)
                docs = append(docs, doc)
-               testSeriesPool.Release(series)
        }
        require.NoError(t, si.Write(docs))
        // Restart the index
@@ -155,11 +152,10 @@ func TestSeriesIndex_Primary(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        var seriesQueries []*pbv1.Series
                        for i := range tt.entityValues {
-                               seriesQuery := testSeriesPool.Generate()
-                               defer testSeriesPool.Release(seriesQuery)
+                               var seriesQuery pbv1.Series
                                seriesQuery.Subject = tt.subject
                                seriesQuery.EntityValues = tt.entityValues[i]
-                               seriesQueries = append(seriesQueries, 
seriesQuery)
+                               seriesQueries = append(seriesQueries, 
&seriesQuery)
                        }
                        sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
                        require.NoError(t, err)
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
index b43a3bdb..9f307fc2 100644
--- a/banyand/liaison/grpc/node.go
+++ b/banyand/liaison/grpc/node.go
@@ -18,6 +18,7 @@
 package grpc
 
 import (
+       "fmt"
        "sync"
 
        "github.com/pkg/errors"
@@ -37,6 +38,7 @@ var (
 // together with the shardID calculated from the incoming data.
 type NodeRegistry interface {
        Locate(group, name string, shardID uint32) (string, error)
+       fmt.Stringer
 }
 
 type clusterNodeService struct {
@@ -94,8 +96,16 @@ func (n *clusterNodeService) OnDelete(metadata 
schema.Metadata) {
        }
 }
 
+func (n *clusterNodeService) String() string {
+       return n.sel.String()
+}
+
 type localNodeService struct{}
 
+func (l localNodeService) String() string {
+       return "local"
+}
+
 // NewLocalNodeRegistry creates a local(fake) node registry.
 func NewLocalNodeRegistry() NodeRegistry {
        return localNodeService{}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index a4d48bb5..2cc0dd73 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -191,7 +191,6 @@ func (s *server) Validate() error {
        if s.enableIngestionAccessLog && s.accessLogRootPath == "" {
                return errAccessLogRootPath
        }
-       observability.UpdateAddress("grpc", s.addr)
        if !s.tls {
                return nil
        }
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index f1b10053..f7b797b2 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -40,7 +40,6 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/ui"
@@ -102,7 +101,6 @@ func (p *server) Validate() error {
        if p.listenAddr == ":" {
                return errNoAddr
        }
-       observability.UpdateAddress("http", p.listenAddr)
        if p.grpcCert != "" {
                creds, errTLS := credentials.NewClientTLSFromFile(p.grpcCert, 
"")
                if errTLS != nil {
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 001940cc..ae5006e6 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -20,7 +20,6 @@ package measure
 import (
        "slices"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -29,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -403,7 +403,7 @@ func generateBlock() *block {
        if v == nil {
                return &block{}
        }
-       return v.(*block)
+       return v
 }
 
 func releaseBlock(b *block) {
@@ -411,7 +411,7 @@ func releaseBlock(b *block) {
        blockPool.Put(b)
 }
 
-var blockPool sync.Pool
+var blockPool = pool.Register[*block]("measure-block")
 
 type blockCursor struct {
        p                   *part
@@ -705,14 +705,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        return true
 }
 
-var blockCursorPool sync.Pool
+var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor")
 
 func generateBlockCursor() *blockCursor {
        v := blockCursorPool.Get()
        if v == nil {
                return &blockCursor{}
        }
-       return v.(*blockCursor)
+       return v
 }
 
 func releaseBlockCursor(bc *blockCursor) {
@@ -832,7 +832,7 @@ func generateBlockPointer() *blockPointer {
        if v == nil {
                return &blockPointer{}
        }
-       return v.(*blockPointer)
+       return v
 }
 
 func releaseBlockPointer(bi *blockPointer) {
@@ -840,4 +840,4 @@ func releaseBlockPointer(bi *blockPointer) {
        blockPointerPool.Put(bi)
 }
 
-var blockPointerPool sync.Pool
+var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer")
diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index 41093282..ff23c4af 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -21,11 +21,11 @@ import (
        "errors"
        "fmt"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
@@ -170,7 +170,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                        if err != nil {
                                return nil, fmt.Errorf("cannot unmarshal 
tagFamily name: %w", err)
                        }
-                       // TODO: cache dataBlock
                        tf := &dataBlock{}
                        src, err = tf.unmarshal(src)
                        if err != nil {
@@ -198,7 +197,7 @@ func generateBlockMetadata() *blockMetadata {
        if v == nil {
                return &blockMetadata{}
        }
-       return v.(*blockMetadata)
+       return v
 }
 
 func releaseBlockMetadata(bm *blockMetadata) {
@@ -206,7 +205,7 @@ func releaseBlockMetadata(bm *blockMetadata) {
        blockMetadataPool.Put(bm)
 }
 
-var blockMetadataPool sync.Pool
+var blockMetadataPool = pool.Register[*blockMetadata]("measure-blockMetadata")
 
 type blockMetadataArray struct {
        arr []blockMetadata
@@ -219,14 +218,14 @@ func (bma *blockMetadataArray) reset() {
        bma.arr = bma.arr[:0]
 }
 
-var blockMetadataArrayPool sync.Pool
+var blockMetadataArrayPool = 
pool.Register[*blockMetadataArray]("measure-blockMetadataArray")
 
 func generateBlockMetadataArray() *blockMetadataArray {
        v := blockMetadataArrayPool.Get()
        if v == nil {
                return &blockMetadataArray{}
        }
-       return v.(*blockMetadataArray)
+       return v
 }
 
 func releaseBlockMetadataArray(bma *blockMetadataArray) {
diff --git a/banyand/measure/block_reader.go b/banyand/measure/block_reader.go
index eb221eef..238b6833 100644
--- a/banyand/measure/block_reader.go
+++ b/banyand/measure/block_reader.go
@@ -22,11 +22,11 @@ import (
        "errors"
        "fmt"
        "io"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type seqReader struct {
@@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {
 
 func generateSeqReader() *seqReader {
        if v := seqReaderPool.Get(); v != nil {
-               return v.(*seqReader)
+               return v
        }
        return &seqReader{}
 }
@@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
        seqReaderPool.Put(sr)
 }
 
-var seqReaderPool sync.Pool
+var seqReaderPool = pool.Register[*seqReader]("measure-seqReader")
 
 type seqReaders struct {
        tagFamilyMetadata map[string]*seqReader
@@ -219,11 +219,11 @@ func (br *blockReader) error() error {
        return br.err
 }
 
-var blockReaderPool sync.Pool
+var blockReaderPool = pool.Register[*blockReader]("measure-blockReader")
 
 func generateBlockReader() *blockReader {
        if v := blockReaderPool.Get(); v != nil {
-               return v.(*blockReader)
+               return v
        }
        return &blockReader{}
 }
diff --git a/banyand/measure/block_reader_test.go 
b/banyand/measure/block_reader_test.go
index 63f3184e..cd576b78 100644
--- a/banyand/measure/block_reader_test.go
+++ b/banyand/measure/block_reader_test.go
@@ -62,6 +62,13 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
                        },
                },
+               {
+                       name:    "Test with a single part with same ts",
+                       dpsList: []*dataPoints{duplicatedDps},
+                       want: []blockMetadata{
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
24},
+                       },
+               },
                {
                        name:    "Test with multiple parts with same ts",
                        dpsList: []*dataPoints{dpsTS1, dpsTS1},
@@ -77,7 +84,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       verify := func(pp []*part) {
+                       verify := func(t *testing.T, pp []*part) {
                                var pii []*partMergeIter
                                for _, p := range pp {
                                        pmi := &partMergeIter{}
@@ -116,7 +123,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                }
                        }
 
-                       t.Run("memory parts", func(_ *testing.T) {
+                       t.Run("memory parts", func(t *testing.T) {
                                var mpp []*memPart
                                defer func() {
                                        for _, mp := range mpp {
@@ -130,7 +137,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                        mp.mustInitFromDataPoints(dps)
                                        pp = append(pp, openMemPart(mp))
                                }
-                               verify(pp)
+                               verify(t, pp)
                        })
 
                        t.Run("file parts", func(t *testing.T) {
@@ -158,7 +165,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                        fpp = append(fpp, filePW)
                                        pp = append(pp, filePW.p)
                                }
-                               verify(pp)
+                               verify(t, pp)
                        })
                })
        }
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index fd065f1a..4a07291c 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -19,12 +19,12 @@ package measure
 
 import (
        "path/filepath"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type writer struct {
@@ -285,7 +285,7 @@ func generateBlockWriter() *blockWriter {
                        },
                }
        }
-       return v.(*blockWriter)
+       return v
 }
 
 func releaseBlockWriter(bsw *blockWriter) {
@@ -293,4 +293,4 @@ func releaseBlockWriter(bsw *blockWriter) {
        blockWriterPool.Put(bsw)
 }
 
-var blockWriterPool sync.Pool
+var blockWriterPool = pool.Register[*blockWriter]("measure-blockWriter")
diff --git a/banyand/measure/column.go b/banyand/measure/column.go
index eafa0134..35f34ab8 100644
--- a/banyand/measure/column.go
+++ b/banyand/measure/column.go
@@ -112,7 +112,7 @@ func (c *column) mustSeqReadValues(decoder 
*encoding.BytesBlockDecoder, reader *
        }
 }
 
-var bigValuePool bytes.BufferPool
+var bigValuePool = bytes.NewBufferPool("measure-big-value")
 
 type columnFamily struct {
        name    string
diff --git a/banyand/measure/column_metadata.go 
b/banyand/measure/column_metadata.go
index 3f1eb957..3f7102af 100644
--- a/banyand/measure/column_metadata.go
+++ b/banyand/measure/column_metadata.go
@@ -36,11 +36,11 @@ package measure
 
 import (
        "fmt"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type columnMetadata struct {
@@ -148,7 +148,7 @@ func generateColumnFamilyMetadata() *columnFamilyMetadata {
        if v == nil {
                return &columnFamilyMetadata{}
        }
-       return v.(*columnFamilyMetadata)
+       return v
 }
 
 func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) {
@@ -156,4 +156,4 @@ func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) 
{
        columnFamilyMetadataPool.Put(cfm)
 }
 
-var columnFamilyMetadataPool sync.Pool
+var columnFamilyMetadataPool = 
pool.Register[*columnFamilyMetadata]("measure-columnFamilyMetadata")
diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go
index 03b07093..156d4b45 100644
--- a/banyand/measure/datapoints.go
+++ b/banyand/measure/datapoints.go
@@ -123,6 +123,19 @@ type dataPoints struct {
        fields      []nameValues
 }
 
+func (d *dataPoints) skip(i int) {
+       if len(d.timestamps) <= i {
+               return
+       }
+       d.seriesIDs = append(d.seriesIDs[:i], d.seriesIDs[i+1:]...)
+       d.timestamps = append(d.timestamps[:i], d.timestamps[i+1:]...)
+       d.versions = append(d.versions[:i], d.versions[i+1:]...)
+       d.tagFamilies = append(d.tagFamilies[:i], d.tagFamilies[i+1:]...)
+       if len(d.fields) > 0 {
+               d.fields = append(d.fields[:i], d.fields[i+1:]...)
+       }
+}
+
 func (d *dataPoints) Len() int {
        return len(d.seriesIDs)
 }
@@ -131,7 +144,10 @@ func (d *dataPoints) Less(i, j int) bool {
        if d.seriesIDs[i] != d.seriesIDs[j] {
                return d.seriesIDs[i] < d.seriesIDs[j]
        }
-       return d.timestamps[i] < d.timestamps[j]
+       if d.timestamps[i] != d.timestamps[j] {
+               return d.timestamps[i] < d.timestamps[j]
+       }
+       return d.versions[i] > d.versions[j]
 }
 
 func (d *dataPoints) Swap(i, j int) {
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index 7fce11d1..0bfb2e23 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -18,8 +18,7 @@
 package measure
 
 import (
-       "sync"
-
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
@@ -33,14 +32,14 @@ func (i *introduction) reset() {
        i.applied = nil
 }
 
-var introductionPool = sync.Pool{}
+var introductionPool = pool.Register[*introduction]("measure-introduction")
 
 func generateIntroduction() *introduction {
        v := introductionPool.Get()
        if v == nil {
                return &introduction{}
        }
-       i := v.(*introduction)
+       i := v
        i.reset()
        return i
 }
@@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() {
        i.applied = nil
 }
 
-var flusherIntroductionPool = sync.Pool{}
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("measure-flusher-introduction")
 
 func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
@@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
                        flushed: make(map[uint64]*partWrapper),
                }
        }
-       i := v.(*flusherIntroduction)
+       i := v
        i.reset()
        return i
 }
@@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() {
        i.creator = 0
 }
 
-var mergerIntroductionPool = sync.Pool{}
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("measure-merger-introduction")
 
 func generateMergerIntroduction() *mergerIntroduction {
        v := mergerIntroductionPool.Get()
        if v == nil {
                return &mergerIntroduction{}
        }
-       i := v.(*mergerIntroduction)
+       i := v
        i.reset()
        return i
 }
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index c6383814..41baf29b 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -22,7 +22,6 @@ import (
        "path"
        "path/filepath"
        "sort"
-       "sync"
        "sync/atomic"
        "time"
 
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
@@ -150,16 +150,29 @@ func (mp *memPart) mustInitFromDataPoints(dps 
*dataPoints) {
        var sidPrev common.SeriesID
        uncompressedBlockSizeBytes := uint64(0)
        var indexPrev int
-       for i := range dps.timestamps {
+       var tsPrev int64
+       for i := 0; i < len(dps.timestamps); i++ {
                sid := dps.seriesIDs[i]
                if sidPrev == 0 {
                        sidPrev = sid
                }
 
+               if sid == sidPrev {
+                       if tsPrev == dps.timestamps[i] {
+                               dps.skip(i)
+                               i--
+                               continue
+                       }
+                       tsPrev = dps.timestamps[i]
+               } else {
+                       tsPrev = 0
+               }
+
                if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
                        (i-indexPrev) > maxBlockLength || sid != sidPrev {
                        bsw.MustWriteDataPoints(sidPrev, 
dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], 
dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
                        sidPrev = sid
+                       tsPrev = 0
                        indexPrev = i
                        uncompressedBlockSizeBytes = 0
                }
@@ -209,7 +222,7 @@ func generateMemPart() *memPart {
        if v == nil {
                return &memPart{}
        }
-       return v.(*memPart)
+       return v
 }
 
 func releaseMemPart(mp *memPart) {
@@ -217,7 +230,7 @@ func releaseMemPart(mp *memPart) {
        memPartPool.Put(mp)
 }
 
-var memPartPool sync.Pool
+var memPartPool = pool.Register[*memPart]("measure-memPart")
 
 type partWrapper struct {
        mp        *memPart
@@ -239,6 +252,12 @@ func (pw *partWrapper) decRef() {
        if n > 0 {
                return
        }
+       if pw.mp != nil {
+               releaseMemPart(pw.mp)
+               pw.mp = nil
+               pw.p = nil
+               return
+       }
        pw.p.close()
        if pw.removable.Load() && pw.p.fileSystem != nil {
                go func(pw *partWrapper) {
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
index 0c916334..cf67d320 100644
--- a/banyand/measure/part_iter.go
+++ b/banyand/measure/part_iter.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "io"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type partIter struct {
@@ -327,7 +327,7 @@ func generatePartMergeIter() *partMergeIter {
        if v == nil {
                return &partMergeIter{}
        }
-       return v.(*partMergeIter)
+       return v
 }
 
 func releasePartMergeIter(pmi *partMergeIter) {
@@ -335,7 +335,7 @@ func releasePartMergeIter(pmi *partMergeIter) {
        pmiPool.Put(pmi)
 }
 
-var pmiPool sync.Pool
+var pmiPool = pool.Register[*partMergeIter]("measure-partMergeIter")
 
 type partMergeIterHeap []*partMergeIter
 
@@ -369,7 +369,7 @@ func generateColumnValuesDecoder() 
*encoding.BytesBlockDecoder {
        if v == nil {
                return &encoding.BytesBlockDecoder{}
        }
-       return v.(*encoding.BytesBlockDecoder)
+       return v
 }
 
 func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
@@ -377,4 +377,4 @@ func releaseColumnValuesDecoder(d 
*encoding.BytesBlockDecoder) {
        columnValuesDecoderPool.Put(d)
 }
 
-var columnValuesDecoderPool sync.Pool
+var columnValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("measure-columnValuesDecoder")
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index fa94091e..c9ac2117 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -242,9 +242,8 @@ func (s *measure) searchBlocks(ctx context.Context, result 
*queryResult, sids []
        defer releaseBlockMetadataArray(bma)
        defFn := startBlockScanSpan(ctx, len(sids), parts, result)
        defer defFn()
-       // TODO: cache tstIter
-       var tstIter tstIter
-       defer tstIter.reset()
+       tstIter := generateTstIter()
+       defer releaseTstIter(tstIter)
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index d2a20b4a..033979a8 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -376,6 +377,21 @@ func (ti *tstIter) Error() error {
        return ti.err
 }
 
+func generateTstIter() *tstIter {
+       v := tstIterPool.Get()
+       if v == nil {
+               return &tstIter{}
+       }
+       return v
+}
+
+func releaseTstIter(ti *tstIter) {
+       ti.reset()
+       tstIterPool.Put(ti)
+}
+
+var tstIterPool = pool.Register[*tstIter]("measure-tstIter")
+
 type partIterHeap []*partIter
 
 func (pih *partIterHeap) Len() int {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index 57ea15a4..386f57b1 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -214,6 +214,16 @@ func Test_tstIter(t *testing.T) {
                                        {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
                                },
                        },
+                       {
+                               name:         "Test with a single part with 
same ts",
+                               dpsList:      []*dataPoints{duplicatedDps},
+                               sids:         []common.SeriesID{1},
+                               minTimestamp: 1,
+                               maxTimestamp: 1,
+                               want: []blockMetadata{
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 24},
+                               },
+                       },
                        {
                                name:         "Test with multiple parts with 
same ts",
                                dpsList:      []*dataPoints{dpsTS1, dpsTS1},
@@ -568,6 +578,58 @@ var dpsTS2 = &dataPoints{
        },
 }
 
+var duplicatedDps = &dataPoints{
+       seriesIDs:  []common.SeriesID{1, 1, 1},
+       timestamps: []int64{1, 1, 1},
+       versions:   []int64{1, 2, 3},
+       tagFamilies: [][]nameValues{
+               {
+                       {
+                               name: "arrTag", values: []*nameValue{
+                                       {name: "strArrTag", valueType: 
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"), 
[]byte("value2")}},
+                                       {name: "intArrTag", valueType: 
pbv1.ValueTypeInt64Arr, value: nil, valueArr: 
[][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}},
+                               },
+                       },
+                       {
+                               name: "binaryTag", values: []*nameValue{
+                                       {name: "binaryTag", valueType: 
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+                               },
+                       },
+                       {
+                               name: "singleTag", values: []*nameValue{
+                                       {name: "strTag", valueType: 
pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+                                       {name: "intTag", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(10), valueArr: nil},
+                               },
+                       },
+               },
+               {
+                       {
+                               name: "singleTag", values: []*nameValue{
+                                       {name: "strTag1", valueType: 
pbv1.ValueTypeStr, value: []byte("tag1"), valueArr: nil},
+                                       {name: "strTag2", valueType: 
pbv1.ValueTypeStr, value: []byte("tag2"), valueArr: nil},
+                               },
+                       },
+               },
+               {}, // empty tagFamilies for seriesID 3
+       },
+       fields: []nameValues{
+               {
+                       name: "skipped", values: []*nameValue{
+                               {name: "strField", valueType: 
pbv1.ValueTypeStr, value: []byte("field1"), valueArr: nil},
+                               {name: "intField", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil},
+                               {name: "floatField", valueType: 
pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(1221233.343), valueArr: 
nil},
+                               {name: "binaryField", valueType: 
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+                       },
+               },
+               {}, // empty fields for seriesID 2
+               {
+                       name: "onlyFields", values: []*nameValue{
+                               {name: "intField", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil},
+                       },
+               },
+       },
+}
+
 func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) 
*dataPoints {
        hugeDps := &dataPoints{
                seriesIDs:   []common.SeriesID{},
diff --git a/banyand/observability/meter_prom.go 
b/banyand/observability/meter_prom.go
index 260b442f..ea5d2d8a 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -18,6 +18,7 @@
 package observability
 
 import (
+       "net/http"
        "sync"
 
        grpcprom 
"github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
@@ -42,10 +43,6 @@ var (
 func init() {
        reg.MustRegister(collectors.NewGoCollector())
        
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
-       metricsMux.Handle("/metrics", promhttp.HandlerFor(
-               reg,
-               promhttp.HandlerOpts{},
-       ))
 }
 
 // NewMeterProvider returns a meter.Provider based on the given scope.
@@ -53,6 +50,13 @@ func newPromMeterProvider() meter.Provider {
        return prom.NewProvider(SystemScope, reg)
 }
 
+func registerMetricsEndpoint(metricsMux *http.ServeMux) {
+       metricsMux.Handle("/metrics", promhttp.HandlerFor(
+               reg,
+               promhttp.HandlerOpts{},
+       ))
+}
+
 // MetricsServerInterceptor returns a server interceptor for metrics.
 func promMetricsServerInterceptor() (grpc.UnaryServerInterceptor, 
grpc.StreamServerInterceptor) {
        once.Do(func() {
diff --git a/banyand/observability/metrics_system.go 
b/banyand/observability/metrics_system.go
index ae391b73..7717db5d 100644
--- a/banyand/observability/metrics_system.go
+++ b/banyand/observability/metrics_system.go
@@ -55,8 +55,22 @@ var (
        upTimeGauge     meter.Gauge
        diskStateGauge  meter.Gauge
        initMetricsOnce sync.Once
+       diskMap         = sync.Map{}
 )
 
+// UpdatePath updates a path to monitoring its disk usage.
+func UpdatePath(path string) {
+       diskMap.Store(path, nil)
+}
+
+func getPath() (paths []string) {
+       diskMap.Range(func(key, _ any) bool {
+               paths = append(paths, key.(string))
+               return true
+       })
+       return paths
+}
+
 func init() {
        MetricsCollector.Register("cpu", collectCPU)
        MetricsCollector.Register("memory", collectMemory)
@@ -169,7 +183,7 @@ func collectUpTime() {
 }
 
 func collectDisk() {
-       for path := range getPath() {
+       for _, path := range getPath() {
                usage, err := disk.Usage(path)
                if err != nil {
                        if _, err = os.Stat(path); err != nil {
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index 8e41c5ed..4170b09f 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -42,9 +42,8 @@ const (
 )
 
 var (
-       _          run.Service = (*metricService)(nil)
-       _          run.Config  = (*metricService)(nil)
-       metricsMux             = http.NewServeMux()
+       _ run.Service = (*metricService)(nil)
+       _ run.Config  = (*metricService)(nil)
        // MetricsServerInterceptor is the function to obtain grpc metrics 
interceptor.
        MetricsServerInterceptor func() (grpc.UnaryServerInterceptor, 
grpc.StreamServerInterceptor) = emptyMetricsServerInterceptor
 )
@@ -146,6 +145,11 @@ func (p *metricService) Serve() run.StopNotify {
        if err != nil {
                p.l.Fatal().Err(err).Msg("Failed to register metrics collector")
        }
+       metricsMux := http.NewServeMux()
+       metricsMux.HandleFunc("/_route", p.routeTableHandler)
+       if containsMode(p.modes, flagPromethusMode) {
+               registerMetricsEndpoint(metricsMux)
+       }
        if containsMode(p.modes, flagNativeMode) {
                err = p.scheduler.Register("native-metric-collection", 
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
                        NativeMetricCollection.FlushMetrics()
@@ -180,6 +184,12 @@ func (p *metricService) GracefulStop() {
        p.closer.CloseThenWait()
 }
 
+func (p *metricService) routeTableHandler(w http.ResponseWriter, _ 
*http.Request) {
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       _, _ = w.Write([]byte(p.nodeSelector.String()))
+}
+
 func containsMode(modes []string, mode string) bool {
        for _, item := range modes {
                if item == mode {
diff --git a/banyand/observability/system.go b/banyand/observability/system.go
deleted file mode 100644
index 20aa68b6..00000000
--- a/banyand/observability/system.go
+++ /dev/null
@@ -1,162 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package observability
-
-import (
-       "encoding/json"
-       "net/http"
-       "os"
-       "sync"
-
-       "github.com/shirou/gopsutil/v3/cpu"
-       "github.com/shirou/gopsutil/v3/disk"
-       "github.com/shirou/gopsutil/v3/host"
-       "github.com/shirou/gopsutil/v3/mem"
-
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-var systemInfoInstance *SystemInfo
-
-func init() {
-       systemInfoInstance = &SystemInfo{
-               Addresses:  make(map[string]string),
-               DiskUsages: make(map[string]*DiskUsage),
-       }
-}
-
-// UpdateAddress updates the address of the given name.
-func UpdateAddress(name, address string) {
-       systemInfoInstance.Lock()
-       defer systemInfoInstance.Unlock()
-       systemInfoInstance.Addresses[name] = address
-}
-
-func getAddresses() map[string]string {
-       systemInfoInstance.RLock()
-       defer systemInfoInstance.RUnlock()
-       return systemInfoInstance.Addresses
-}
-
-// UpdatePath updates a path to monitoring its disk usage.
-func UpdatePath(path string) {
-       systemInfoInstance.Lock()
-       defer systemInfoInstance.Unlock()
-       systemInfoInstance.DiskUsages[path] = nil
-}
-
-func getPath() map[string]*DiskUsage {
-       systemInfoInstance.RLock()
-       defer systemInfoInstance.RUnlock()
-       return systemInfoInstance.DiskUsages
-}
-
-// SystemInfo represents the system information of a node.
-type SystemInfo struct {
-       Addresses   map[string]string     `json:"addresses"`
-       DiskUsages  map[string]*DiskUsage `json:"disk_usages"`
-       NodeID      string                `json:"node_id"`
-       Hostname    string                `json:"hostname"`
-       Roles       []string              `json:"roles"`
-       Uptime      uint64                `json:"uptime"`
-       CPUUsage    float64               `json:"cpu_usage"`
-       MemoryUsage float64               `json:"memory_usage"`
-       sync.RWMutex
-}
-
-// DiskUsage represents the disk usage for a given path.
-type DiskUsage struct {
-       Capacity uint64 `json:"capacity"`
-       Used     uint64 `json:"used"`
-}
-
-// ErrorResponse represents the error response.
-type ErrorResponse struct {
-       Message       string `json:"message"`
-       OriginalError string `json:"original_error,omitempty"`
-}
-
-func init() {
-       metricsMux.HandleFunc("/system", systemInfoHandler)
-}
-
-func systemInfoHandler(w http.ResponseWriter, _ *http.Request) {
-       hostname, _ := os.Hostname()
-       uptime, _ := getUptime()
-       cpuUsage, _ := getCPUUsage()
-       memoryUsage, _ := getMemoryUsage()
-
-       systemInfo := &SystemInfo{
-               NodeID:      "1",
-               Roles:       []string{"meta", "ingest", "query", "data"},
-               Hostname:    hostname,
-               Uptime:      uptime,
-               CPUUsage:    cpuUsage,
-               MemoryUsage: memoryUsage,
-               Addresses:   getAddresses(),
-               DiskUsages:  make(map[string]*DiskUsage),
-       }
-       for k := range getPath() {
-               usage, _ := getDiskUsage(k)
-               systemInfo.DiskUsages[k] = &usage
-       }
-       w.Header().Set("Content-Type", "application/json")
-       w.WriteHeader(http.StatusOK)
-       if err := json.NewEncoder(w).Encode([]*SystemInfo{systemInfo}); err != 
nil {
-               w.WriteHeader(http.StatusInternalServerError)
-               errorResponse := &ErrorResponse{
-                       Message:       "Error encoding JSON response",
-                       OriginalError: err.Error(),
-               }
-               if err := json.NewEncoder(w).Encode(errorResponse); err != nil {
-                       logger.GetLogger().Error().Err(err).Msg("Error encoding 
JSON response")
-               }
-       }
-}
-
-func getUptime() (uint64, error) {
-       uptime, err := host.Uptime()
-       if err != nil {
-               return 0, err
-       }
-       return uptime, nil
-}
-
-func getCPUUsage() (float64, error) {
-       percentages, err := cpu.Percent(0, false)
-       if err != nil {
-               return 0, err
-       }
-       return percentages[0], nil
-}
-
-func getMemoryUsage() (float64, error) {
-       vm, err := mem.VirtualMemory()
-       if err != nil {
-               return 0, err
-       }
-       return vm.UsedPercent, nil
-}
-
-func getDiskUsage(path string) (DiskUsage, error) {
-       usage, err := disk.Usage(path)
-       if err != nil {
-               return DiskUsage{}, err
-       }
-       return DiskUsage{Capacity: usage.Total, Used: usage.Used}, nil
-}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 2b880f9f..829e460a 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -112,7 +112,6 @@ func (s *server) Validate() error {
        if s.addr == ":" {
                return errNoAddr
        }
-       observability.UpdateAddress("grpc", s.addr)
        if !s.tls {
                return nil
        }
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index bd0db065..e6e7b964 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -19,7 +19,6 @@ package stream
 
 import (
        "sort"
-       "sync"
 
        "golang.org/x/exp/slices"
 
@@ -31,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -351,7 +351,7 @@ func generateBlock() *block {
        if v == nil {
                return &block{}
        }
-       return v.(*block)
+       return v
 }
 
 func releaseBlock(b *block) {
@@ -359,7 +359,7 @@ func releaseBlock(b *block) {
        blockPool.Put(b)
 }
 
-var blockPool sync.Pool
+var blockPool = pool.Register[*block]("stream-block")
 
 type blockCursor struct {
        p                *part
@@ -559,14 +559,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        return true
 }
 
-var blockCursorPool sync.Pool
+var blockCursorPool = pool.Register[*blockCursor]("stream-blockCursor")
 
 func generateBlockCursor() *blockCursor {
        v := blockCursorPool.Get()
        if v == nil {
                return &blockCursor{}
        }
-       return v.(*blockCursor)
+       return v
 }
 
 func releaseBlockCursor(bc *blockCursor) {
@@ -668,7 +668,7 @@ func generateBlockPointer() *blockPointer {
        if v == nil {
                return &blockPointer{}
        }
-       return v.(*blockPointer)
+       return v
 }
 
 func releaseBlockPointer(bi *blockPointer) {
@@ -676,4 +676,4 @@ func releaseBlockPointer(bi *blockPointer) {
        blockPointerPool.Put(bi)
 }
 
-var blockPointerPool sync.Pool
+var blockPointerPool = pool.Register[*blockPointer]("stream-blockPointer")
diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go
index 9f0091ee..4f410cd5 100644
--- a/banyand/stream/block_metadata.go
+++ b/banyand/stream/block_metadata.go
@@ -21,11 +21,11 @@ import (
        "errors"
        "fmt"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
@@ -175,7 +175,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                        if err != nil {
                                return nil, fmt.Errorf("cannot unmarshal 
tagFamily name: %w", err)
                        }
-                       // TODO: cache dataBlock
                        tf := &dataBlock{}
                        src, err = tf.unmarshal(src)
                        if err != nil {
@@ -202,7 +201,7 @@ func generateBlockMetadata() *blockMetadata {
        if v == nil {
                return &blockMetadata{}
        }
-       return v.(*blockMetadata)
+       return v
 }
 
 func releaseBlockMetadata(bm *blockMetadata) {
@@ -210,7 +209,7 @@ func releaseBlockMetadata(bm *blockMetadata) {
        blockMetadataPool.Put(bm)
 }
 
-var blockMetadataPool sync.Pool
+var blockMetadataPool = pool.Register[*blockMetadata]("stream-blockMetadata")
 
 type blockMetadataArray struct {
        arr []blockMetadata
@@ -223,14 +222,14 @@ func (bma *blockMetadataArray) reset() {
        bma.arr = bma.arr[:0]
 }
 
-var blockMetadataArrayPool sync.Pool
+var blockMetadataArrayPool = 
pool.Register[*blockMetadataArray]("stream-blockMetadataArray")
 
 func generateBlockMetadataArray() *blockMetadataArray {
        v := blockMetadataArrayPool.Get()
        if v == nil {
                return &blockMetadataArray{}
        }
-       return v.(*blockMetadataArray)
+       return v
 }
 
 func releaseBlockMetadataArray(bma *blockMetadataArray) {
diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go
index 60701515..1c8a987c 100644
--- a/banyand/stream/block_reader.go
+++ b/banyand/stream/block_reader.go
@@ -22,11 +22,11 @@ import (
        "errors"
        "fmt"
        "io"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type seqReader struct {
@@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {
 
 func generateSeqReader() *seqReader {
        if v := seqReaderPool.Get(); v != nil {
-               return v.(*seqReader)
+               return v
        }
        return &seqReader{}
 }
@@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
        seqReaderPool.Put(sr)
 }
 
-var seqReaderPool sync.Pool
+var seqReaderPool = pool.Register[*seqReader]("stream-seqReader")
 
 type seqReaders struct {
        tagFamilyMetadata map[string]*seqReader
@@ -216,11 +216,11 @@ func (br *blockReader) error() error {
        return br.err
 }
 
-var blockReaderPool sync.Pool
+var blockReaderPool = pool.Register[*blockReader]("stream-blockReader")
 
 func generateBlockReader() *blockReader {
        if v := blockReaderPool.Get(); v != nil {
-               return v.(*blockReader)
+               return v
        }
        return &blockReader{}
 }
diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go
index 5125bd85..f55f9ebf 100644
--- a/banyand/stream/block_writer.go
+++ b/banyand/stream/block_writer.go
@@ -19,12 +19,12 @@ package stream
 
 import (
        "path/filepath"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type writer struct {
@@ -279,7 +279,7 @@ func generateBlockWriter() *blockWriter {
                        },
                }
        }
-       return v.(*blockWriter)
+       return v
 }
 
 func releaseBlockWriter(bsw *blockWriter) {
@@ -287,4 +287,4 @@ func releaseBlockWriter(bsw *blockWriter) {
        blockWriterPool.Put(bsw)
 }
 
-var blockWriterPool sync.Pool
+var blockWriterPool = pool.Register[*blockWriter]("stream-blockWriter")
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index 76c6e659..072e8b39 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -18,8 +18,7 @@
 package stream
 
 import (
-       "sync"
-
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
@@ -33,14 +32,14 @@ func (i *introduction) reset() {
        i.applied = nil
 }
 
-var introductionPool = sync.Pool{}
+var introductionPool = pool.Register[*introduction]("stream-introduction")
 
 func generateIntroduction() *introduction {
        v := introductionPool.Get()
        if v == nil {
                return &introduction{}
        }
-       intro := v.(*introduction)
+       intro := v
        intro.reset()
        return intro
 }
@@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() {
        i.applied = nil
 }
 
-var flusherIntroductionPool = sync.Pool{}
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("stream-flusher-introduction")
 
 func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
@@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
                        flushed: make(map[uint64]*partWrapper),
                }
        }
-       fi := v.(*flusherIntroduction)
+       fi := v
        fi.reset()
        return fi
 }
@@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() {
        i.creator = 0
 }
 
-var mergerIntroductionPool = sync.Pool{}
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("stream-merger-introduction")
 
 func generateMergerIntroduction() *mergerIntroduction {
        v := mergerIntroductionPool.Get()
        if v == nil {
                return &mergerIntroduction{}
        }
-       mi := v.(*mergerIntroduction)
+       mi := v
        mi.reset()
        return mi
 }
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index b6ad05c7..228e607c 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -22,7 +22,6 @@ import (
        "path"
        "path/filepath"
        "sort"
-       "sync"
        "sync/atomic"
        "time"
 
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
@@ -198,7 +198,7 @@ func generateMemPart() *memPart {
        if v == nil {
                return &memPart{}
        }
-       return v.(*memPart)
+       return v
 }
 
 func releaseMemPart(mp *memPart) {
@@ -206,7 +206,7 @@ func releaseMemPart(mp *memPart) {
        memPartPool.Put(mp)
 }
 
-var memPartPool sync.Pool
+var memPartPool = pool.Register[*memPart]("stream-memPart")
 
 type partWrapper struct {
        mp        *memPart
diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go
index e9c24b98..f4277032 100644
--- a/banyand/stream/part_iter.go
+++ b/banyand/stream/part_iter.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "io"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type partIter struct {
@@ -324,7 +324,7 @@ func generatePartMergeIter() *partMergeIter {
        if v == nil {
                return &partMergeIter{}
        }
-       return v.(*partMergeIter)
+       return v
 }
 
 func releasePartMergeIter(pmi *partMergeIter) {
@@ -332,7 +332,7 @@ func releasePartMergeIter(pmi *partMergeIter) {
        pmiPool.Put(pmi)
 }
 
-var pmiPool sync.Pool
+var pmiPool = pool.Register[*partMergeIter]("stream-partMergeIter")
 
 type partMergeIterHeap []*partMergeIter
 
@@ -366,7 +366,7 @@ func generateColumnValuesDecoder() 
*encoding.BytesBlockDecoder {
        if v == nil {
                return &encoding.BytesBlockDecoder{}
        }
-       return v.(*encoding.BytesBlockDecoder)
+       return v
 }
 
 func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
@@ -374,4 +374,4 @@ func releaseColumnValuesDecoder(d 
*encoding.BytesBlockDecoder) {
        columnValuesDecoderPool.Put(d)
 }
 
-var columnValuesDecoderPool sync.Pool
+var columnValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("stream-columnValuesDecoder")
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 0a5ebdc3..7c766136 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -190,9 +190,8 @@ func (qr *queryResult) scanParts(ctx context.Context, qo 
queryOptions) error {
        defer releaseBlockMetadataArray(bma)
        defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
        defer defFn()
-       // TODO: cache tstIter
-       var ti tstIter
-       defer ti.reset()
+       ti := generateTstIter()
+       defer releaseTstIter(ti)
        sids := qo.sortedSids
        ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
        if ti.Error() != nil {
@@ -288,6 +287,7 @@ func (qr *queryResult) load(ctx context.Context, qo 
queryOptions) *model.StreamR
                        return blankCursorList[i] > blankCursorList[j]
                })
                for _, index := range blankCursorList {
+                       releaseBlockCursor(qr.data[index])
                        qr.data = append(qr.data[:index], qr.data[index+1:]...)
                }
                qr.loaded = true
@@ -610,6 +610,7 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value 
[]byte) *modelv1.TagValu
        case pbv1.ValueTypeStrArr:
                var values []string
                bb := bigValuePool.Generate()
+               defer bigValuePool.Release(bb)
                var err error
                for len(value) > 0 {
                        bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], 
value)
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index 7f5459c0..b7954810 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -112,7 +112,7 @@ func (t *tag) mustSeqReadValues(decoder 
*encoding.BytesBlockDecoder, reader *seq
        }
 }
 
-var bigValuePool bytes.BufferPool
+var bigValuePool = bytes.NewBufferPool("stream-big-value")
 
 type tagFamily struct {
        name string
diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go
index d044f0ec..d470b6f9 100644
--- a/banyand/stream/tag_metadata.go
+++ b/banyand/stream/tag_metadata.go
@@ -19,11 +19,11 @@ package stream
 
 import (
        "fmt"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type tagMetadata struct {
@@ -131,7 +131,7 @@ func generateTagFamilyMetadata() *tagFamilyMetadata {
        if v == nil {
                return &tagFamilyMetadata{}
        }
-       return v.(*tagFamilyMetadata)
+       return v
 }
 
 func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) {
@@ -139,4 +139,4 @@ func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) {
        tagFamilyMetadataPool.Put(tfm)
 }
 
-var tagFamilyMetadataPool sync.Pool
+var tagFamilyMetadataPool = 
pool.Register[*tagFamilyMetadata]("stream-tagFamilyMetadata")
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index dff93b39..ea329ff1 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -34,6 +34,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -388,6 +389,21 @@ func (ti *tstIter) Error() error {
        return ti.err
 }
 
+func generateTstIter() *tstIter {
+       v := tstIterPool.Get()
+       if v == nil {
+               return &tstIter{}
+       }
+       return v
+}
+
+func releaseTstIter(ti *tstIter) {
+       ti.reset()
+       tstIterPool.Put(ti)
+}
+
+var tstIterPool = pool.Register[*tstIter]("stream-tstIter")
+
 type partIterHeap []*partIter
 
 func (pih *partIterHeap) Len() int {
diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go
index 595ce54c..c657cde9 100644
--- a/pkg/bytes/buffer.go
+++ b/pkg/bytes/buffer.go
@@ -21,9 +21,9 @@ package bytes
 import (
        "fmt"
        "io"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 var (
@@ -107,9 +107,16 @@ func (r *reader) Close() error {
        return nil
 }
 
+// NewBufferPool creates a new BufferPool.
+func NewBufferPool(name string) *BufferPool {
+       return &BufferPool{
+               p: pool.Register[*Buffer](name),
+       }
+}
+
 // BufferPool is a pool of Buffer.
 type BufferPool struct {
-       p sync.Pool
+       p *pool.Synced[*Buffer]
 }
 
 // Generate generates a Buffer.
@@ -118,7 +125,7 @@ func (bp *BufferPool) Generate() *Buffer {
        if bbv == nil {
                return &Buffer{}
        }
-       return bbv.(*Buffer)
+       return bbv
 }
 
 // Release releases a Buffer.
diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go
index 6aab3f89..25ece6fa 100644
--- a/pkg/encoding/bytes.go
+++ b/pkg/encoding/bytes.go
@@ -297,4 +297,4 @@ func decompressBlock(dst, src []byte) ([]byte, []byte, 
error) {
        }
 }
 
-var bbPool bytes.BufferPool
+var bbPool = bytes.NewBufferPool("encoding.bytesBlock")
diff --git a/pkg/encoding/encoder.go b/pkg/encoding/encoder.go
deleted file mode 100644
index 78add92c..00000000
--- a/pkg/encoding/encoder.go
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
-       "bytes"
-       "encoding/binary"
-       "io"
-       "sync"
-       "time"
-
-       "github.com/pkg/errors"
-
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-var (
-       encoderPool = sync.Pool{
-               New: newEncoder,
-       }
-       decoderPool = sync.Pool{
-               New: func() interface{} {
-                       return &decoder{}
-               },
-       }
-
-       errInvalidValue = errors.New("invalid encoded value")
-       errNoData       = errors.New("there is no data")
-)
-
-type encoderPoolDelegator struct {
-       pool *sync.Pool
-       fn   ParseInterval
-       name string
-       size int
-}
-
-// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor 
encoders.
-func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool 
{
-       return &encoderPoolDelegator{
-               name: name,
-               pool: &encoderPool,
-               size: size,
-               fn:   fn,
-       }
-}
-
-func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) 
SeriesEncoder {
-       encoder := b.pool.Get().(*encoder)
-       encoder.name = b.name
-       encoder.size = b.size
-       encoder.fn = b.fn
-       encoder.Reset(metadata, buffer)
-       return encoder
-}
-
-func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) {
-       _, ok := seriesEncoder.(*encoder)
-       if ok {
-               b.pool.Put(seriesEncoder)
-       }
-}
-
-type decoderPoolDelegator struct {
-       pool *sync.Pool
-       fn   ParseInterval
-       name string
-       size int
-}
-
-// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor 
decoders.
-func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool 
{
-       return &decoderPoolDelegator{
-               name: name,
-               pool: &decoderPool,
-               size: size,
-               fn:   fn,
-       }
-}
-
-func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder {
-       decoder := b.pool.Get().(*decoder)
-       decoder.name = b.name
-       decoder.size = b.size
-       decoder.fn = b.fn
-       return decoder
-}
-
-func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) {
-       _, ok := seriesDecoder.(*decoder)
-       if ok {
-               b.pool.Put(seriesDecoder)
-       }
-}
-
-var _ SeriesEncoder = (*encoder)(nil)
-
-// ParseInterval parses the interval rule from the key in a kv pair.
-type ParseInterval = func(key []byte) time.Duration
-
-type encoder struct {
-       buff      BufferWriter
-       bw        *Writer
-       values    *XOREncoder
-       fn        ParseInterval
-       name      string
-       interval  time.Duration
-       startTime uint64
-       prevTime  uint64
-       num       int
-       size      int
-}
-
-func newEncoder() interface{} {
-       bw := NewWriter()
-       return &encoder{
-               bw:     bw,
-               values: NewXOREncoder(bw),
-       }
-}
-
-func (ie *encoder) Append(ts uint64, value []byte) {
-       if len(value) > 8 {
-               return
-       }
-       if ie.startTime == 0 {
-               ie.startTime = ts
-               ie.prevTime = ts
-       } else if ie.startTime > ts {
-               ie.startTime = ts
-       }
-       gap := int(ie.prevTime) - int(ts)
-       if gap < 0 {
-               return
-       }
-       zeroNum := gap/int(ie.interval) - 1
-       for i := 0; i < zeroNum; i++ {
-               ie.bw.WriteBool(false)
-               ie.num++
-       }
-       ie.prevTime = ts
-       l := len(value)
-       ie.bw.WriteBool(l > 0)
-       ie.values.Write(convert.BytesToUint64(value))
-       ie.num++
-}
-
-func (ie *encoder) IsFull() bool {
-       return ie.num >= ie.size
-}
-
-func (ie *encoder) Reset(key []byte, buffer BufferWriter) {
-       ie.buff = buffer
-       ie.bw.Reset(buffer)
-       ie.interval = ie.fn(key)
-       ie.startTime = 0
-       ie.prevTime = 0
-       ie.num = 0
-       ie.values = NewXOREncoder(ie.bw)
-}
-
-func (ie *encoder) Encode() error {
-       ie.bw.Flush()
-       buffWriter := NewPacker(ie.buff)
-       buffWriter.PutUint64(ie.startTime)
-       buffWriter.PutUint16(uint16(ie.num))
-       return nil
-}
-
-func (ie *encoder) StartTime() uint64 {
-       return ie.startTime
-}
-
-var _ SeriesDecoder = (*decoder)(nil)
-
-type decoder struct {
-       fn        ParseInterval
-       name      string
-       area      []byte
-       size      int
-       interval  time.Duration
-       startTime uint64
-       num       int
-}
-
-func (i *decoder) Decode(key, data []byte) error {
-       if len(data) < 10 {
-               return errInvalidValue
-       }
-       i.interval = i.fn(key)
-       i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : 
len(data)-2])
-       i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
-       i.area = data[:len(data)-10]
-       return nil
-}
-
-func (i decoder) Len() int {
-       return i.num
-}
-
-func (i decoder) IsFull() bool {
-       return i.num >= i.size
-}
-
-func (i decoder) Get(ts uint64) ([]byte, error) {
-       for iter := i.Iterator(); iter.Next(); {
-               if iter.Time() == ts {
-                       return iter.Val(), nil
-               }
-       }
-       return nil, errors.WithMessagef(errNoData, "ts:%d", ts)
-}
-
-func (i decoder) Range() (start, end uint64) {
-       return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
-}
-
-func (i decoder) Iterator() SeriesIterator {
-       br := NewReader(bytes.NewReader(i.area))
-       return &intIterator{
-               endTime:  i.startTime + uint64(i.num*int(i.interval)),
-               interval: int(i.interval),
-               br:       br,
-               values:   NewXORDecoder(br),
-               size:     i.num,
-       }
-}
-
-var _ SeriesIterator = (*intIterator)(nil)
-
-type intIterator struct {
-       err      error
-       br       *Reader
-       values   *XORDecoder
-       endTime  uint64
-       interval int
-       size     int
-       currVal  uint64
-       currTime uint64
-       index    int
-}
-
-func (i *intIterator) Next() bool {
-       if i.index >= i.size {
-               return false
-       }
-       var b bool
-       var err error
-       for !b {
-               b, err = i.br.ReadBool()
-               if errors.Is(err, io.EOF) {
-                       return false
-               }
-               if err != nil {
-                       i.err = err
-                       return false
-               }
-               i.index++
-               i.currTime = i.endTime - uint64(i.interval*i.index)
-       }
-       if i.values.Next() {
-               i.currVal = i.values.Value()
-       }
-       return true
-}
-
-func (i *intIterator) Val() []byte {
-       return convert.Uint64ToBytes(i.currVal)
-}
-
-func (i *intIterator) Time() uint64 {
-       return i.currTime
-}
-
-func (i *intIterator) Error() error {
-       return i.err
-}
diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go
deleted file mode 100644
index 840a883f..00000000
--- a/pkg/encoding/encoder_test.go
+++ /dev/null
@@ -1,392 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
-       "bytes"
-       "testing"
-       "time"
-
-       "github.com/stretchr/testify/assert"
-
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-func TestNewEncoderAndDecoder(t *testing.T) {
-       type tsData struct {
-               ts    []uint64
-               data  []any
-               start uint64
-               end   uint64
-       }
-       tests := []struct {
-               name string
-               args tsData
-               want tsData
-       }{
-               {
-                       name: "int golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7, 8, 7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9, 6},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7, 8, 7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{7, 8, 7},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{7, 8, 7},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "int empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{7, 9},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(1 * time.Minute)},
-                               data:  []any{7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7.0, 8.0, 7.0, 9.0},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7.0, 8.0, 7.0, 9.0},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{0.7, 0.8, 0.7, 0.9, 0.6},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{0.7, 0.8, 0.7, 0.9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{1.7, 1.8, 1.7},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{1.7, 1.8, 1.7},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{0.700033, 0.988822},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(1 * time.Minute)},
-                               data:  []any{0.700033, 0.988822},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-       }
-       key := []byte("foo")
-       fn := func(k []byte) time.Duration {
-               assert.Equal(t, key, k)
-               return 1 * time.Minute
-       }
-       encoderPool := NewEncoderPool("minute", 4, fn)
-       decoderPool := NewDecoderPool("minute", 4, fn)
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       at := assert.New(t)
-                       var buffer bytes.Buffer
-                       encoder := encoderPool.Get(key, &buffer)
-                       defer encoderPool.Put(encoder)
-                       decoder := decoderPool.Get(key)
-                       defer decoderPool.Put(decoder)
-                       isFull := false
-                       for i, v := range tt.args.ts {
-                               encoder.Append(v, ToBytes(tt.args.data[i]))
-                               if encoder.IsFull() {
-                                       isFull = true
-                                       break
-                               }
-                       }
-                       err := encoder.Encode()
-                       at.NoError(err)
-
-                       at.Equal(tt.want.start, encoder.StartTime())
-                       at.NoError(decoder.Decode(key, buffer.Bytes()))
-                       start, end := decoder.Range()
-                       at.Equal(tt.want.start, start)
-                       at.Equal(tt.want.end, end)
-                       if isFull {
-                               at.True(decoder.IsFull())
-                       }
-                       i := 0
-                       for iter := decoder.Iterator(); iter.Next(); i++ {
-                               at.NoError(iter.Error())
-                               at.Equal(tt.want.ts[i], iter.Time())
-                               at.Equal(tt.want.data[i], 
BytesTo(tt.want.data[i], iter.Val()))
-                               v, err := decoder.Get(iter.Time())
-                               at.NoError(err)
-                               at.Equal(tt.want.data[i], 
BytesTo(tt.want.data[i], v))
-                       }
-                       at.Equal(len(tt.want.ts), i)
-               })
-       }
-}
-
-func ToBytes(v any) []byte {
-       switch d := v.(type) {
-       case int:
-               return convert.Int64ToBytes(int64(d))
-       case float64:
-               return convert.Float64ToBytes(d)
-       }
-       return nil
-}
-
-func BytesTo(t any, b []byte) any {
-       switch t.(type) {
-       case int:
-               return int(convert.BytesToInt64(b))
-       case float64:
-               return convert.BytesToFloat64(b)
-       }
-       return nil
-}
-
-func TestNewDecoderGet(t *testing.T) {
-       type tsData struct {
-               ts   []uint64
-               data []any
-       }
-       type wantData struct {
-               ts      []uint64
-               data    []any
-               wantErr []bool
-               start   uint64
-               end     uint64
-       }
-       tests := []struct {
-               name string
-               args tsData
-               want wantData
-       }{
-               {
-                       name: "int golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7, 8, 7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9, 6},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
-                               data:    []any{7, 8, 7, 9, nil},
-                               wantErr: []bool{false, false, false, false, 
true},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{7, 8, 7},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{7, 8, 7},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "int empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{7, 9},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:    []any{7, nil, nil, 9},
-                               wantErr: []bool{false, true, true, false},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7.0, 8.0, 7.0, 9.0},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7.0, 8.0, 7.0, 9.0},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{1.7, 1.8, 1.7, 1.9, 1.6},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
-                               data:    []any{1.7, 1.8, 1.7, 1.9, nil},
-                               wantErr: []bool{false, false, false, false, 
true},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{0.71, 0.833, 0.709},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{0.71, 0.833, 0.709},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "float empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{1.7, 1.9},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:    []any{1.7, nil, nil, 1.9},
-                               wantErr: []bool{false, true, true, false},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-       }
-       key := []byte("foo")
-       fn := func(k []byte) time.Duration {
-               assert.Equal(t, key, k)
-               return 1 * time.Minute
-       }
-       encoderPool := NewEncoderPool("minute", 4, fn)
-       decoderPool := NewDecoderPool("minute", 4, fn)
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       at := assert.New(t)
-                       var buffer bytes.Buffer
-                       encoder := encoderPool.Get(key, &buffer)
-                       defer encoderPool.Put(encoder)
-                       decoder := decoderPool.Get(key)
-                       defer decoderPool.Put(decoder)
-                       isFull := false
-                       for i, v := range tt.args.ts {
-                               encoder.Append(v, ToBytes(tt.args.data[i]))
-                               if encoder.IsFull() {
-                                       isFull = true
-                                       break
-                               }
-                       }
-                       err := encoder.Encode()
-                       at.NoError(err)
-
-                       at.Equal(tt.want.start, encoder.StartTime())
-                       at.NoError(decoder.Decode(key, buffer.Bytes()))
-                       start, end := decoder.Range()
-                       at.Equal(tt.want.start, start)
-                       at.Equal(tt.want.end, end)
-                       if isFull {
-                               at.True(decoder.IsFull())
-                       }
-                       for i, t := range tt.want.ts {
-                               wantErr := false
-                               if tt.want.wantErr != nil {
-                                       wantErr = tt.want.wantErr[i]
-                               }
-                               v, err := decoder.Get(t)
-                               if wantErr {
-                                       at.ErrorIs(err, errNoData)
-                               } else {
-                                       at.NoError(err)
-                                       at.Equal(tt.want.data[i], 
BytesTo(tt.want.data[i], v))
-                               }
-                       }
-               })
-       }
-}
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index 21b77482..05a227ab 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,7 +20,8 @@ package encoding
 import (
        "encoding/binary"
        "fmt"
-       "sync"
+
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 // Uint16ToBytes appends the bytes of the given uint16 to the given byte slice.
@@ -224,7 +225,7 @@ func GenerateInt64List(size int) *Int64List {
                        L: make([]int64, size),
                }
        }
-       is := v.(*Int64List)
+       is := v
        if n := size - cap(is.L); n > 0 {
                is.L = append(is.L[:cap(is.L)], make([]int64, n)...)
        }
@@ -243,7 +244,7 @@ type Int64List struct {
        L []int64
 }
 
-var int64ListPool sync.Pool
+var int64ListPool = pool.Register[*Int64List]("encoding-int64List")
 
 // GenerateUint64List generates a list of uint64 with the given size.
 // The returned list may be from a pool and should be released after use.
@@ -254,7 +255,7 @@ func GenerateUint64List(size int) *Uint64List {
                        L: make([]uint64, size),
                }
        }
-       is := v.(*Uint64List)
+       is := v
        if n := size - cap(is.L); n > 0 {
                is.L = append(is.L[:cap(is.L)], make([]uint64, n)...)
        }
@@ -273,4 +274,4 @@ type Uint64List struct {
        L []uint64
 }
 
-var uint64ListPool sync.Pool
+var uint64ListPool = pool.Register[*Uint64List]("encoding-uin64List")
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index ff7dac5e..39fcb22a 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -25,12 +25,12 @@ import (
        "io"
        "os"
        "path/filepath"
-       "sync"
        "time"
 
        "github.com/shirou/gopsutil/v3/disk"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const defaultIOSize = 256 * 1024
@@ -465,7 +465,7 @@ func generateReader(f *os.File) *bufio.Reader {
        if v == nil {
                return bufio.NewReaderSize(f, defaultIOSize)
        }
-       br := v.(*bufio.Reader)
+       br := v
        br.Reset(f)
        return br
 }
@@ -475,14 +475,14 @@ func releaseReader(br *bufio.Reader) {
        bufReaderPool.Put(br)
 }
 
-var bufReaderPool sync.Pool
+var bufReaderPool = pool.Register[*bufio.Reader]("fs-bufReader")
 
 func generateWriter(f *os.File) *bufio.Writer {
        v := bufWriterPool.Get()
        if v == nil {
                return bufio.NewWriterSize(f, defaultIOSize)
        }
-       bw := v.(*bufio.Writer)
+       bw := v
        bw.Reset(f)
        return bw
 }
@@ -492,4 +492,4 @@ func releaseWriter(bw *bufio.Writer) {
        bufWriterPool.Put(bw)
 }
 
-var bufWriterPool sync.Pool
+var bufWriterPool = pool.Register[*bufio.Writer]("fs-bufWriter")
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index a481b7f3..81af6e20 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -25,7 +25,6 @@ import (
        "io"
        "log"
        "math"
-       "sync"
        "time"
 
        "github.com/blugelabs/bluge"
@@ -43,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -88,14 +88,14 @@ type store struct {
        l      *logger.Logger
 }
 
-var batchPool sync.Pool
+var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch")
 
 func generateBatch() *blugeIndex.Batch {
        b := batchPool.Get()
        if b == nil {
                return bluge.NewBatch()
        }
-       return b.(*blugeIndex.Batch)
+       return b
 }
 
 func releaseBatch(b *blugeIndex.Batch) {
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index 9904e11a..209211fd 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -19,6 +19,7 @@
 package native
 
 import (
+       "fmt"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -34,6 +35,7 @@ import (
 // NodeSelector has Locate method to select a nodeId.
 type NodeSelector interface {
        Locate(group, name string, shardID uint32) (string, error)
+       fmt.Stringer
 }
 
 type collector interface {
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 0f41aab5..c4341b7d 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -20,6 +20,7 @@ package node
 
 import (
        "context"
+       "fmt"
        "sync"
 
        "github.com/pkg/errors"
@@ -42,6 +43,7 @@ type Selector interface {
        RemoveNode(node *databasev1.Node)
        Pick(group, name string, shardID uint32) (string, error)
        run.PreRunner
+       fmt.Stringer
 }
 
 // NewPickFirstSelector returns a simple selector that always returns the 
first node if exists.
@@ -58,6 +60,17 @@ type pickFirstSelector struct {
        mu        sync.RWMutex
 }
 
+// String implements Selector.
+func (p *pickFirstSelector) String() string {
+       n, err := p.Pick("", "", 0)
+       if err != nil {
+               return fmt.Sprintf("%v", err)
+       }
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       return fmt.Sprintf("pick [%s] from %s", n, p.nodeIDs)
+}
+
 func (p *pickFirstSelector) PreRun(context.Context) error {
        return nil
 }
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index 34a855f6..3efcaab6 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -19,6 +19,7 @@ package node
 
 import (
        "context"
+       "fmt"
        "sort"
        "strconv"
        "sync"
@@ -38,6 +39,18 @@ type maglevSelector struct {
        mutex   sync.RWMutex
 }
 
+// String implements Selector.
+func (m *maglevSelector) String() string {
+       var groups []string
+       m.routers.Range(func(key, _ any) bool {
+               groups = append(groups, key.(string))
+               return true
+       })
+       m.mutex.RLock()
+       defer m.mutex.Unlock()
+       return fmt.Sprintf("nodes:%s groups:%s", m.nodes, groups)
+}
+
 func (m *maglevSelector) Name() string {
        return "maglev-selector"
 }
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
index 2815b19c..d0bb37e1 100644
--- a/pkg/node/round_robin.go
+++ b/pkg/node/round_robin.go
@@ -19,6 +19,7 @@ package node
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "slices"
        "sort"
@@ -32,22 +33,47 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
 type roundRobinSelector struct {
        schemaRegistry metadata.Repo
        closeCh        chan struct{}
-       lookupTable    sync.Map
+       lookupTable    []key
        nodes          []string
        mu             sync.RWMutex
 }
 
+func (r *roundRobinSelector) String() string {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       result := make(map[string]string)
+       for _, entry := range r.lookupTable {
+               n, err := r.Pick(entry.group, "", entry.shardID)
+               key := fmt.Sprintf("%s-%d", entry.group, entry.shardID)
+               if err != nil {
+                       result[key] = fmt.Sprintf("%v", err)
+                       continue
+               }
+               result[key] = n
+       }
+       if len(result) < 1 {
+               return ""
+       }
+       jsonBytes, err := json.Marshal(result)
+       if err != nil {
+               return fmt.Sprintf("%v", err)
+       }
+       return convert.BytesToString(jsonBytes)
+}
+
 // NewRoundRobinSelector creates a new round-robin selector.
 func NewRoundRobinSelector(schemaRegistry metadata.Repo) Selector {
        rrs := &roundRobinSelector{
                nodes:          make([]string, 0),
                closeCh:        make(chan struct{}),
                schemaRegistry: schemaRegistry,
+               lookupTable:    make([]key, 0),
        }
        return rrs
 }
@@ -69,9 +95,11 @@ func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        if !ok || !validateGroup(group) {
                return
        }
+       r.mu.Lock()
+       defer r.mu.Unlock()
        for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
                k := key{group: group.Metadata.Name, shardID: i}
-               r.lookupTable.Store(k, 0)
+               r.lookupTable = append(r.lookupTable, k)
        }
        r.sortEntries()
 }
@@ -80,12 +108,18 @@ func (r *roundRobinSelector) OnDelete(schemaMetadata 
schema.Metadata) {
        if schemaMetadata.Kind != schema.KindGroup {
                return
        }
+       r.mu.Lock()
+       defer r.mu.Unlock()
        group := schemaMetadata.Spec.(*commonv1.Group)
        for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
                k := key{group: group.Metadata.Name, shardID: i}
-               r.lookupTable.Delete(k)
+               for j := range r.lookupTable {
+                       if r.lookupTable[j] == k {
+                               r.lookupTable = append(r.lookupTable[:j], 
r.lookupTable[j+1:]...)
+                               break
+                       }
+               }
        }
-       r.sortEntries()
 }
 
 func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) {
@@ -101,8 +135,10 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) 
(bool, []int64) {
        if err != nil {
                panic(fmt.Sprintf("failed to list group: %v", err))
        }
+       r.mu.Lock()
+       defer r.mu.Unlock()
        var revision int64
-       r.lookupTable = sync.Map{}
+       r.lookupTable = r.lookupTable[:0]
        for _, g := range gg {
                if !validateGroup(g) {
                        continue
@@ -112,7 +148,7 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) 
(bool, []int64) {
                }
                for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ {
                        k := key{group: g.Metadata.Name, shardID: i}
-                       r.lookupTable.Store(k, 0)
+                       r.lookupTable = append(r.lookupTable, k)
                }
        }
        r.sortEntries()
@@ -144,29 +180,26 @@ func (r *roundRobinSelector) Pick(group, _ string, 
shardID uint32) (string, erro
        if len(r.nodes) == 0 {
                return "", errors.New("no nodes available")
        }
-       entry, ok := r.lookupTable.Load(k)
-       if ok {
-               return r.selectNode(entry), nil
+       i := sort.Search(len(r.lookupTable), func(i int) bool {
+               if r.lookupTable[i].group == group {
+                       return r.lookupTable[i].shardID >= shardID
+               }
+               return r.lookupTable[i].group > group
+       })
+       if i < len(r.lookupTable) && r.lookupTable[i] == k {
+               return r.selectNode(i), nil
        }
        return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID)
 }
 
 func (r *roundRobinSelector) sortEntries() {
-       var keys []key
-       r.lookupTable.Range(func(k, _ any) bool {
-               keys = append(keys, k.(key))
-               return true
-       })
-       slices.SortFunc(keys, func(a, b key) int {
+       slices.SortFunc(r.lookupTable, func(a, b key) int {
                n := strings.Compare(a.group, b.group)
                if n != 0 {
                        return n
                }
                return int(a.shardID) - int(b.shardID)
        })
-       for i := range keys {
-               r.lookupTable.Store(keys[i], i)
-       }
 }
 
 func (r *roundRobinSelector) Close() {
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
index 9e8037c5..223e1908 100644
--- a/pkg/node/round_robin_test.go
+++ b/pkg/node/round_robin_test.go
@@ -111,6 +111,28 @@ func TestCleanupGroup(t *testing.T) {
        assert.Error(t, err)
 }
 
+func TestSortNodeEntries(t *testing.T) {
+       selector := &roundRobinSelector{
+               nodes: make([]string, 0),
+       }
+       setupGroup(selector)
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node3"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       assert.EqualValues(t, []string{"node1", "node2", "node3"}, 
selector.nodes)
+}
+
+func TestStringer(t *testing.T) {
+       selector := NewRoundRobinSelector(nil)
+       assert.Empty(t, selector.String())
+       setupGroup(selector)
+       assert.NotEmpty(t, selector.String())
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node3"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       assert.NotEmpty(t, selector.String())
+}
+
 var groupSchema = schema.Metadata{
        TypeMeta: schema.TypeMeta{
                Kind: schema.KindGroup,
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index 754d155e..a0e76827 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -19,7 +19,6 @@ package v1
 
 import (
        "sort"
-       "sync"
 
        "github.com/pkg/errors"
 
@@ -97,26 +96,6 @@ func (s *Series) reset() {
        s.Buffer = s.Buffer[:0]
 }
 
-// SeriesPool is a pool of Series.
-type SeriesPool struct {
-       pool sync.Pool
-}
-
-// Generate creates a new Series or gets one from the pool.
-func (sp *SeriesPool) Generate() *Series {
-       sv := sp.pool.Get()
-       if sv == nil {
-               return &Series{}
-       }
-       return sv.(*Series)
-}
-
-// Release puts a Series back to the pool.
-func (sp *SeriesPool) Release(s *Series) {
-       s.reset()
-       sp.pool.Put(s)
-}
-
 // SeriesList is a collection of Series.
 type SeriesList []*Series
 
diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go
new file mode 100644
index 00000000..323cb81c
--- /dev/null
+++ b/pkg/pool/pool.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package pool provides a pool for reusing objects.
+package pool
+
+import (
+       "fmt"
+       "sync"
+       "sync/atomic"
+)
+
+var poolMap = sync.Map{}
+
+// Register registers a new pool with the given name.
+func Register[T any](name string) *Synced[T] {
+       p := new(Synced[T])
+       if _, ok := poolMap.LoadOrStore(name, p); ok {
+               panic(fmt.Sprintf("duplicated pool: %s", name))
+       }
+       return p
+}
+
+// AllRefsCount returns the reference count of all pools.
+func AllRefsCount() map[string]int {
+       result := make(map[string]int)
+       poolMap.Range(func(key, value any) bool {
+               result[key.(string)] = value.(Trackable).RefsCount()
+               return true
+       })
+       return result
+}
+
+// Trackable is the interface that wraps the RefsCount method.
+type Trackable interface {
+       // RefsCount returns the reference count of the pool.
+       RefsCount() int
+}
+
+// Synced is a pool that is safe for concurrent use.
+type Synced[T any] struct {
+       sync.Pool
+       refs atomic.Int32
+}
+
+// Get returns an object from the pool.
+// If the pool is empty, nil is returned.
+func (p *Synced[T]) Get() T {
+       v := p.Pool.Get()
+       p.refs.Add(1)
+       if v == nil {
+               var t T
+               return t
+       }
+       return v.(T)
+}
+
+// Put puts an object back to the pool.
+func (p *Synced[T]) Put(v T) {
+       p.Pool.Put(v)
+       p.refs.Add(-1)
+}
+
+// RefsCount returns the reference count of the pool.
+func (p *Synced[T]) RefsCount() int {
+       return int(p.refs.Load())
+}
diff --git a/pkg/test/gmatcher/gmatcher.go b/pkg/test/gmatcher/gmatcher.go
new file mode 100644
index 00000000..93b6fb1a
--- /dev/null
+++ b/pkg/test/gmatcher/gmatcher.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package gmatcher provides custom Gomega matchers.
+package gmatcher
+
+import (
+       "fmt"
+
+       "github.com/onsi/gomega"
+)
+
+// HaveZeroRef returns a matcher that checks if all pools have 0 references.
+func HaveZeroRef() gomega.OmegaMatcher {
+       return &ZeroRefMatcher{}
+}
+
+var _ gomega.OmegaMatcher = &ZeroRefMatcher{}
+
+// ZeroRefMatcher is a matcher that checks if all pools have 0 references.
+type ZeroRefMatcher struct{}
+
+// FailureMessage implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) FailureMessage(actual interface{}) (message string) {
+       return fmt.Sprintf("expected all pools to have 0 references, got %v", 
actual)
+}
+
+// Match implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) Match(actual interface{}) (success bool, err error) {
+       data, ok := actual.(map[string]int)
+       if !ok {
+               return false, fmt.Errorf("expected map[string]int, got %T", 
actual)
+       }
+       for pooName, refers := range data {
+               if refers > 0 {
+                       return false, fmt.Errorf("pool %s has %d references", 
pooName, refers)
+               }
+       }
+       return true, nil
+}
+
+// NegatedFailureMessage implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) NegatedFailureMessage(actual interface{}) (message 
string) {
+       return fmt.Sprintf("expected at least one pool to have references, got 
%v", actual)
+}
diff --git a/pkg/test/measure/testdata/groups/exception.json 
b/pkg/test/measure/testdata/groups/exception.json
new file mode 100644
index 00000000..fadbd5a5
--- /dev/null
+++ b/pkg/test/measure/testdata/groups/exception.json
@@ -0,0 +1,18 @@
+{
+  "metadata": {
+    "name": "exception"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    }
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/duplicated.json 
b/pkg/test/measure/testdata/measures/duplicated.json
new file mode 100644
index 00000000..e25f0a37
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/duplicated.json
@@ -0,0 +1,42 @@
+{
+  "metadata": {
+    "name": "duplicated",
+    "group": "exception"
+  },
+  "tag_families": [
+    {
+      "name": "default",
+      "tags": [
+        {
+          "name": "id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "entity_id",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    }
+  ],
+  "fields": [
+    {
+      "name": "total",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    },
+    {
+      "name": "value",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    }
+  ],
+  "entity": {
+    "tag_names": [
+      "entity_id"
+    ]
+  },
+  "interval": "1m",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/init.go b/test/cases/init.go
index a39e09f6..7341a96d 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -54,4 +54,5 @@ func Initialize(addr string, now time.Time) {
        casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", 
"service_latency_minute_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_latency_minute", 
"sw_metric", "service_instance_latency_minute_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_latency_minute", 
"sw_metric", "service_instance_latency_minute_data1.json", 
now.Add(1*time.Minute), interval)
+       casesmeasuredata.Write(conn, "duplicated", "exception", 
"duplicated.json", now, 0)
 }
diff --git a/test/cases/measure/data/input/duplicated_part.yaml 
b/test/cases/measure/data/input/duplicated_part.yaml
new file mode 100644
index 00000000..91900820
--- /dev/null
+++ b/test/cases/measure/data/input/duplicated_part.yaml
@@ -0,0 +1,25 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "duplicated"
+groups: ["exception"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["id", "entity_id"]
+fieldProjection:
+  names: ["total", "value"]
diff --git a/test/cases/measure/data/testdata/duplicated.json 
b/test/cases/measure/data/testdata/duplicated.json
new file mode 100644
index 00000000..d67feaaf
--- /dev/null
+++ b/test/cases/measure/data/testdata/duplicated.json
@@ -0,0 +1,182 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 2
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 3
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 5
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 4
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "svc3"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 6
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/want/duplicated_part.yaml 
b/test/cases/measure/data/want/duplicated_part.yaml
new file mode 100644
index 00000000..10a57e59
--- /dev/null
+++ b/test/cases/measure/data/want/duplicated_part.yaml
@@ -0,0 +1,38 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: total
+    value:
+      int:
+        value: "300"
+  - name: value
+    value:
+      int:
+        value: "6"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc3
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 960401ca..06b24498 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -70,4 +70,5 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )
diff --git a/test/integration/distributed/query/query_suite_test.go 
b/test/integration/distributed/query/query_suite_test.go
index 0c78a14f..3d8eaad3 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -35,8 +35,10 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
@@ -132,4 +134,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git a/test/integration/etcd/client_test.go 
b/test/integration/etcd/client_test.go
index a78d8345..ddfb4797 100644
--- a/test/integration/etcd/client_test.go
+++ b/test/integration/etcd/client_test.go
@@ -36,8 +36,10 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
 )
@@ -76,6 +78,7 @@ var _ = Describe("Client Test", func() {
        AfterEach(func() {
                dirSpaceDef()
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
 
        It("should be using user/password connect etcd server successfully", 
func() {
diff --git a/test/integration/load/load_suite_test.go 
b/test/integration/load/load_suite_test.go
index 1858e380..84dccc38 100644
--- a/test/integration/load/load_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -36,8 +36,10 @@ import (
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
@@ -157,6 +159,7 @@ var _ = Describe("Load Test Suit", func() {
                }
                deferFunc()
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
 })
 
diff --git a/test/integration/standalone/cold_query/query_suite_test.go 
b/test/integration/standalone/cold_query/query_suite_test.go
index 18e832ad..a28c145e 100644
--- a/test/integration/standalone/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -29,7 +29,9 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -89,4 +91,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git a/test/integration/standalone/other/measure_test.go 
b/test/integration/standalone/other/measure_test.go
index a2ab6e81..a0ced9ac 100644
--- a/test/integration/standalone/other/measure_test.go
+++ b/test/integration/standalone/other/measure_test.go
@@ -27,7 +27,9 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -57,6 +59,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
                gm.Expect(conn.Close()).To(gm.Succeed())
                deferFn()
                gm.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               gm.Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
        g.It("queries service_cpm_minute by id after updating", func() {
                casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data1.json", baseTime, interval)
diff --git a/test/integration/standalone/other/property_test.go 
b/test/integration/standalone/other/property_test.go
index ab217059..164b3ce3 100644
--- a/test/integration/standalone/other/property_test.go
+++ b/test/integration/standalone/other/property_test.go
@@ -32,7 +32,9 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
 )
 
@@ -57,6 +59,7 @@ var _ = Describe("Property application", func() {
                Expect(conn.Close()).To(Succeed())
                deferFn()
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
        It("applies properties", func() {
                md := &propertyv1.Metadata{
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
index a1e684b2..765cd893 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -29,7 +29,9 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -90,4 +92,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git 
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go 
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index f6fcb14c..346b8918 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -29,8 +29,10 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -103,4 +105,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })

Reply via email to