This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch pool
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 974fcf32e853abbb03ae9ec58a750a412d00ad4f
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Aug 6 14:40:52 2024 +0800

    Add the tracked pool to fix leak issues
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 .github/workflows/e2e.storage.yml                  |   2 +-
 CHANGES.md                                         |   1 +
 banyand/internal/storage/index_test.go             |  10 +-
 banyand/measure/block.go                           |  14 +-
 banyand/measure/block_metadata.go                  |  11 +-
 banyand/measure/block_reader.go                    |  10 +-
 banyand/measure/block_reader_test.go               |  15 +-
 banyand/measure/block_writer.go                    |   6 +-
 banyand/measure/column.go                          |   2 +-
 banyand/measure/column_metadata.go                 |   6 +-
 banyand/measure/introducer.go                      |  15 +-
 banyand/measure/part.go                            |  30 +-
 banyand/measure/part_iter.go                       |  10 +-
 banyand/measure/query.go                           |   5 +-
 banyand/measure/tstable.go                         |  16 +
 banyand/measure/tstable_test.go                    |  62 ++++
 banyand/observability/meter_prom.go                |  12 +-
 banyand/observability/service.go                   |  11 +-
 banyand/stream/block.go                            |  14 +-
 banyand/stream/block_metadata.go                   |  11 +-
 banyand/stream/block_reader.go                     |  10 +-
 banyand/stream/block_writer.go                     |   6 +-
 banyand/stream/introducer.go                       |  15 +-
 banyand/stream/part.go                             |   6 +-
 banyand/stream/part_iter.go                        |  10 +-
 banyand/stream/query.go                            |   7 +-
 banyand/stream/tag.go                              |   2 +-
 banyand/stream/tag_metadata.go                     |   6 +-
 banyand/stream/tstable.go                          |  16 +
 pkg/bytes/buffer.go                                |  12 +-
 pkg/encoding/bytes.go                              |   2 +-
 pkg/encoding/encoder.go                            | 292 ---------------
 pkg/encoding/encoder_test.go                       | 392 ---------------------
 pkg/encoding/int.go                                |  11 +-
 pkg/fs/local_file_system.go                        |  10 +-
 pkg/index/inverted/inverted.go                     |   6 +-
 pkg/pb/v1/series.go                                |  21 --
 pkg/pool/pool.go                                   |  71 ++++
 pkg/test/gmatcher/gmatcher.go                      |  57 +++
 test/cases/measure/data/data.go                    |   2 -
 .../distributed/query/query_suite_test.go          |   3 +
 test/integration/etcd/client_test.go               |   3 +
 test/integration/load/load_suite_test.go           |   3 +
 .../standalone/cold_query/query_suite_test.go      |   3 +
 test/integration/standalone/other/measure_test.go  |   3 +
 test/integration/standalone/other/property_test.go |   3 +
 .../standalone/query/query_suite_test.go           |   3 +
 .../query_ondisk/query_ondisk_suite_test.go        |   3 +
 48 files changed, 403 insertions(+), 838 deletions(-)

diff --git a/.github/workflows/e2e.storage.yml 
b/.github/workflows/e2e.storage.yml
index 69711ba8..a6c4ba1e 100644
--- a/.github/workflows/e2e.storage.yml
+++ b/.github/workflows/e2e.storage.yml
@@ -93,7 +93,7 @@ jobs:
           make docker.build || make docker.build
           docker image ls
       - name: ${{ matrix.test.name }}
-        uses: 
apache/skywalking-infra-e2e@1485ae03f0ad90496ad7626a5ae4a6a73a1f6296
+        uses: 
apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180
         with:
           e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
 
diff --git a/CHANGES.md b/CHANGES.md
index 8355e291..55dbcfe9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -35,6 +35,7 @@ Release Notes.
 - Fix the bug that segment's reference count is increased twice when the 
controller try to create an existing segment.
 - Fix a bug where a distributed query would return an empty result if the 
"limit" was set much lower than the "offset".
 - Fix duplicated measure data in a single part.
+- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
 
 ### Documentation
 
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 0e61b623..d73886a3 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -33,8 +33,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-var testSeriesPool pbv1.SeriesPool
-
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
        path, fn := setUp(require.New(t))
@@ -46,7 +44,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
        }()
        var docs index.Documents
        for i := 0; i < 100; i++ {
-               series := testSeriesPool.Generate()
+               var series pbv1.Series
                series.Subject = "service_instance_latency"
                series.EntityValues = []*modelv1.TagValue{
                        {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
fmt.Sprintf("svc_%d", i)}}},
@@ -64,7 +62,6 @@ func TestSeriesIndex_Primary(t *testing.T) {
                }
                copy(doc.EntityValues, series.Buffer)
                docs = append(docs, doc)
-               testSeriesPool.Release(series)
        }
        require.NoError(t, si.Write(docs))
        // Restart the index
@@ -155,11 +152,10 @@ func TestSeriesIndex_Primary(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        var seriesQueries []*pbv1.Series
                        for i := range tt.entityValues {
-                               seriesQuery := testSeriesPool.Generate()
-                               defer testSeriesPool.Release(seriesQuery)
+                               var seriesQuery pbv1.Series
                                seriesQuery.Subject = tt.subject
                                seriesQuery.EntityValues = tt.entityValues[i]
-                               seriesQueries = append(seriesQueries, 
seriesQuery)
+                               seriesQueries = append(seriesQueries, 
&seriesQuery)
                        }
                        sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
                        require.NoError(t, err)
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 001940cc..ae5006e6 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -20,7 +20,6 @@ package measure
 import (
        "slices"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -29,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -403,7 +403,7 @@ func generateBlock() *block {
        if v == nil {
                return &block{}
        }
-       return v.(*block)
+       return v
 }
 
 func releaseBlock(b *block) {
@@ -411,7 +411,7 @@ func releaseBlock(b *block) {
        blockPool.Put(b)
 }
 
-var blockPool sync.Pool
+var blockPool = pool.Register[*block]("measure-block")
 
 type blockCursor struct {
        p                   *part
@@ -705,14 +705,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        return true
 }
 
-var blockCursorPool sync.Pool
+var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor")
 
 func generateBlockCursor() *blockCursor {
        v := blockCursorPool.Get()
        if v == nil {
                return &blockCursor{}
        }
-       return v.(*blockCursor)
+       return v
 }
 
 func releaseBlockCursor(bc *blockCursor) {
@@ -832,7 +832,7 @@ func generateBlockPointer() *blockPointer {
        if v == nil {
                return &blockPointer{}
        }
-       return v.(*blockPointer)
+       return v
 }
 
 func releaseBlockPointer(bi *blockPointer) {
@@ -840,4 +840,4 @@ func releaseBlockPointer(bi *blockPointer) {
        blockPointerPool.Put(bi)
 }
 
-var blockPointerPool sync.Pool
+var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer")
diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index 41093282..ff23c4af 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -21,11 +21,11 @@ import (
        "errors"
        "fmt"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
@@ -170,7 +170,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                        if err != nil {
                                return nil, fmt.Errorf("cannot unmarshal 
tagFamily name: %w", err)
                        }
-                       // TODO: cache dataBlock
                        tf := &dataBlock{}
                        src, err = tf.unmarshal(src)
                        if err != nil {
@@ -198,7 +197,7 @@ func generateBlockMetadata() *blockMetadata {
        if v == nil {
                return &blockMetadata{}
        }
-       return v.(*blockMetadata)
+       return v
 }
 
 func releaseBlockMetadata(bm *blockMetadata) {
@@ -206,7 +205,7 @@ func releaseBlockMetadata(bm *blockMetadata) {
        blockMetadataPool.Put(bm)
 }
 
-var blockMetadataPool sync.Pool
+var blockMetadataPool = pool.Register[*blockMetadata]("measure-blockMetadata")
 
 type blockMetadataArray struct {
        arr []blockMetadata
@@ -219,14 +218,14 @@ func (bma *blockMetadataArray) reset() {
        bma.arr = bma.arr[:0]
 }
 
-var blockMetadataArrayPool sync.Pool
+var blockMetadataArrayPool = 
pool.Register[*blockMetadataArray]("measure-blockMetadataArray")
 
 func generateBlockMetadataArray() *blockMetadataArray {
        v := blockMetadataArrayPool.Get()
        if v == nil {
                return &blockMetadataArray{}
        }
-       return v.(*blockMetadataArray)
+       return v
 }
 
 func releaseBlockMetadataArray(bma *blockMetadataArray) {
diff --git a/banyand/measure/block_reader.go b/banyand/measure/block_reader.go
index eb221eef..238b6833 100644
--- a/banyand/measure/block_reader.go
+++ b/banyand/measure/block_reader.go
@@ -22,11 +22,11 @@ import (
        "errors"
        "fmt"
        "io"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type seqReader struct {
@@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {
 
 func generateSeqReader() *seqReader {
        if v := seqReaderPool.Get(); v != nil {
-               return v.(*seqReader)
+               return v
        }
        return &seqReader{}
 }
@@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
        seqReaderPool.Put(sr)
 }
 
-var seqReaderPool sync.Pool
+var seqReaderPool = pool.Register[*seqReader]("measure-seqReader")
 
 type seqReaders struct {
        tagFamilyMetadata map[string]*seqReader
@@ -219,11 +219,11 @@ func (br *blockReader) error() error {
        return br.err
 }
 
-var blockReaderPool sync.Pool
+var blockReaderPool = pool.Register[*blockReader]("measure-blockReader")
 
 func generateBlockReader() *blockReader {
        if v := blockReaderPool.Get(); v != nil {
-               return v.(*blockReader)
+               return v
        }
        return &blockReader{}
 }
diff --git a/banyand/measure/block_reader_test.go 
b/banyand/measure/block_reader_test.go
index 63f3184e..cd576b78 100644
--- a/banyand/measure/block_reader_test.go
+++ b/banyand/measure/block_reader_test.go
@@ -62,6 +62,13 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
                        },
                },
+               {
+                       name:    "Test with a single part with same ts",
+                       dpsList: []*dataPoints{duplicatedDps},
+                       want: []blockMetadata{
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
24},
+                       },
+               },
                {
                        name:    "Test with multiple parts with same ts",
                        dpsList: []*dataPoints{dpsTS1, dpsTS1},
@@ -77,7 +84,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       verify := func(pp []*part) {
+                       verify := func(t *testing.T, pp []*part) {
                                var pii []*partMergeIter
                                for _, p := range pp {
                                        pmi := &partMergeIter{}
@@ -116,7 +123,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                }
                        }
 
-                       t.Run("memory parts", func(_ *testing.T) {
+                       t.Run("memory parts", func(t *testing.T) {
                                var mpp []*memPart
                                defer func() {
                                        for _, mp := range mpp {
@@ -130,7 +137,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                        mp.mustInitFromDataPoints(dps)
                                        pp = append(pp, openMemPart(mp))
                                }
-                               verify(pp)
+                               verify(t, pp)
                        })
 
                        t.Run("file parts", func(t *testing.T) {
@@ -158,7 +165,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                        fpp = append(fpp, filePW)
                                        pp = append(pp, filePW.p)
                                }
-                               verify(pp)
+                               verify(t, pp)
                        })
                })
        }
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index fd065f1a..4a07291c 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -19,12 +19,12 @@ package measure
 
 import (
        "path/filepath"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type writer struct {
@@ -285,7 +285,7 @@ func generateBlockWriter() *blockWriter {
                        },
                }
        }
-       return v.(*blockWriter)
+       return v
 }
 
 func releaseBlockWriter(bsw *blockWriter) {
@@ -293,4 +293,4 @@ func releaseBlockWriter(bsw *blockWriter) {
        blockWriterPool.Put(bsw)
 }
 
-var blockWriterPool sync.Pool
+var blockWriterPool = pool.Register[*blockWriter]("measure-blockWriter")
diff --git a/banyand/measure/column.go b/banyand/measure/column.go
index eafa0134..35f34ab8 100644
--- a/banyand/measure/column.go
+++ b/banyand/measure/column.go
@@ -112,7 +112,7 @@ func (c *column) mustSeqReadValues(decoder 
*encoding.BytesBlockDecoder, reader *
        }
 }
 
-var bigValuePool bytes.BufferPool
+var bigValuePool = bytes.NewBufferPool("measure-big-value")
 
 type columnFamily struct {
        name    string
diff --git a/banyand/measure/column_metadata.go 
b/banyand/measure/column_metadata.go
index 3f1eb957..3f7102af 100644
--- a/banyand/measure/column_metadata.go
+++ b/banyand/measure/column_metadata.go
@@ -36,11 +36,11 @@ package measure
 
 import (
        "fmt"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type columnMetadata struct {
@@ -148,7 +148,7 @@ func generateColumnFamilyMetadata() *columnFamilyMetadata {
        if v == nil {
                return &columnFamilyMetadata{}
        }
-       return v.(*columnFamilyMetadata)
+       return v
 }
 
 func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) {
@@ -156,4 +156,4 @@ func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) 
{
        columnFamilyMetadataPool.Put(cfm)
 }
 
-var columnFamilyMetadataPool sync.Pool
+var columnFamilyMetadataPool = 
pool.Register[*columnFamilyMetadata]("measure-columnFamilyMetadata")
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index 7fce11d1..0bfb2e23 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -18,8 +18,7 @@
 package measure
 
 import (
-       "sync"
-
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
@@ -33,14 +32,14 @@ func (i *introduction) reset() {
        i.applied = nil
 }
 
-var introductionPool = sync.Pool{}
+var introductionPool = pool.Register[*introduction]("measure-introduction")
 
 func generateIntroduction() *introduction {
        v := introductionPool.Get()
        if v == nil {
                return &introduction{}
        }
-       i := v.(*introduction)
+       i := v
        i.reset()
        return i
 }
@@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() {
        i.applied = nil
 }
 
-var flusherIntroductionPool = sync.Pool{}
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("measure-flusher-introduction")
 
 func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
@@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
                        flushed: make(map[uint64]*partWrapper),
                }
        }
-       i := v.(*flusherIntroduction)
+       i := v
        i.reset()
        return i
 }
@@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() {
        i.creator = 0
 }
 
-var mergerIntroductionPool = sync.Pool{}
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("measure-merger-introduction")
 
 func generateMergerIntroduction() *mergerIntroduction {
        v := mergerIntroductionPool.Get()
        if v == nil {
                return &mergerIntroduction{}
        }
-       i := v.(*mergerIntroduction)
+       i := v
        i.reset()
        return i
 }
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index dd4c11ae..41baf29b 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -22,7 +22,6 @@ import (
        "path"
        "path/filepath"
        "sort"
-       "sync"
        "sync/atomic"
        "time"
 
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
@@ -150,23 +150,29 @@ func (mp *memPart) mustInitFromDataPoints(dps 
*dataPoints) {
        var sidPrev common.SeriesID
        uncompressedBlockSizeBytes := uint64(0)
        var indexPrev int
-       var prevTS int64
+       var tsPrev int64
        for i := 0; i < len(dps.timestamps); i++ {
                sid := dps.seriesIDs[i]
                if sidPrev == 0 {
                        sidPrev = sid
                }
-               if prevTS == dps.timestamps[i] {
-                       dps.skip(i)
-                       i--
-                       continue
+
+               if sid == sidPrev {
+                       if tsPrev == dps.timestamps[i] {
+                               dps.skip(i)
+                               i--
+                               continue
+                       }
+                       tsPrev = dps.timestamps[i]
+               } else {
+                       tsPrev = 0
                }
-               prevTS = dps.timestamps[i]
 
                if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
                        (i-indexPrev) > maxBlockLength || sid != sidPrev {
                        bsw.MustWriteDataPoints(sidPrev, 
dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], 
dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
                        sidPrev = sid
+                       tsPrev = 0
                        indexPrev = i
                        uncompressedBlockSizeBytes = 0
                }
@@ -216,7 +222,7 @@ func generateMemPart() *memPart {
        if v == nil {
                return &memPart{}
        }
-       return v.(*memPart)
+       return v
 }
 
 func releaseMemPart(mp *memPart) {
@@ -224,7 +230,7 @@ func releaseMemPart(mp *memPart) {
        memPartPool.Put(mp)
 }
 
-var memPartPool sync.Pool
+var memPartPool = pool.Register[*memPart]("measure-memPart")
 
 type partWrapper struct {
        mp        *memPart
@@ -246,6 +252,12 @@ func (pw *partWrapper) decRef() {
        if n > 0 {
                return
        }
+       if pw.mp != nil {
+               releaseMemPart(pw.mp)
+               pw.mp = nil
+               pw.p = nil
+               return
+       }
        pw.p.close()
        if pw.removable.Load() && pw.p.fileSystem != nil {
                go func(pw *partWrapper) {
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
index 0c916334..cf67d320 100644
--- a/banyand/measure/part_iter.go
+++ b/banyand/measure/part_iter.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "io"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type partIter struct {
@@ -327,7 +327,7 @@ func generatePartMergeIter() *partMergeIter {
        if v == nil {
                return &partMergeIter{}
        }
-       return v.(*partMergeIter)
+       return v
 }
 
 func releasePartMergeIter(pmi *partMergeIter) {
@@ -335,7 +335,7 @@ func releasePartMergeIter(pmi *partMergeIter) {
        pmiPool.Put(pmi)
 }
 
-var pmiPool sync.Pool
+var pmiPool = pool.Register[*partMergeIter]("measure-partMergeIter")
 
 type partMergeIterHeap []*partMergeIter
 
@@ -369,7 +369,7 @@ func generateColumnValuesDecoder() 
*encoding.BytesBlockDecoder {
        if v == nil {
                return &encoding.BytesBlockDecoder{}
        }
-       return v.(*encoding.BytesBlockDecoder)
+       return v
 }
 
 func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
@@ -377,4 +377,4 @@ func releaseColumnValuesDecoder(d 
*encoding.BytesBlockDecoder) {
        columnValuesDecoderPool.Put(d)
 }
 
-var columnValuesDecoderPool sync.Pool
+var columnValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("measure-columnValuesDecoder")
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index ae37521a..2bda84da 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -242,9 +242,8 @@ func (s *measure) searchBlocks(ctx context.Context, result 
*queryResult, sids []
        defer releaseBlockMetadataArray(bma)
        defFn := startBlockScanSpan(ctx, len(sids), parts, result)
        defer defFn()
-       // TODO: cache tstIter
-       var tstIter tstIter
-       defer tstIter.reset()
+       tstIter := generateTstIter()
+       defer releaseTstIter(tstIter)
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index d2a20b4a..033979a8 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -376,6 +377,21 @@ func (ti *tstIter) Error() error {
        return ti.err
 }
 
+func generateTstIter() *tstIter {
+       v := tstIterPool.Get()
+       if v == nil {
+               return &tstIter{}
+       }
+       return v
+}
+
+func releaseTstIter(ti *tstIter) {
+       ti.reset()
+       tstIterPool.Put(ti)
+}
+
+var tstIterPool = pool.Register[*tstIter]("measure-tstIter")
+
 type partIterHeap []*partIter
 
 func (pih *partIterHeap) Len() int {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index 57ea15a4..386f57b1 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -214,6 +214,16 @@ func Test_tstIter(t *testing.T) {
                                        {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
                                },
                        },
+                       {
+                               name:         "Test with a single part with 
same ts",
+                               dpsList:      []*dataPoints{duplicatedDps},
+                               sids:         []common.SeriesID{1},
+                               minTimestamp: 1,
+                               maxTimestamp: 1,
+                               want: []blockMetadata{
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 24},
+                               },
+                       },
                        {
                                name:         "Test with multiple parts with 
same ts",
                                dpsList:      []*dataPoints{dpsTS1, dpsTS1},
@@ -568,6 +578,58 @@ var dpsTS2 = &dataPoints{
        },
 }
 
+var duplicatedDps = &dataPoints{
+       seriesIDs:  []common.SeriesID{1, 1, 1},
+       timestamps: []int64{1, 1, 1},
+       versions:   []int64{1, 2, 3},
+       tagFamilies: [][]nameValues{
+               {
+                       {
+                               name: "arrTag", values: []*nameValue{
+                                       {name: "strArrTag", valueType: 
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"), 
[]byte("value2")}},
+                                       {name: "intArrTag", valueType: 
pbv1.ValueTypeInt64Arr, value: nil, valueArr: 
[][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}},
+                               },
+                       },
+                       {
+                               name: "binaryTag", values: []*nameValue{
+                                       {name: "binaryTag", valueType: 
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+                               },
+                       },
+                       {
+                               name: "singleTag", values: []*nameValue{
+                                       {name: "strTag", valueType: 
pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+                                       {name: "intTag", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(10), valueArr: nil},
+                               },
+                       },
+               },
+               {
+                       {
+                               name: "singleTag", values: []*nameValue{
+                                       {name: "strTag1", valueType: 
pbv1.ValueTypeStr, value: []byte("tag1"), valueArr: nil},
+                                       {name: "strTag2", valueType: 
pbv1.ValueTypeStr, value: []byte("tag2"), valueArr: nil},
+                               },
+                       },
+               },
+               {}, // empty tagFamilies for seriesID 3
+       },
+       fields: []nameValues{
+               {
+                       name: "skipped", values: []*nameValue{
+                               {name: "strField", valueType: 
pbv1.ValueTypeStr, value: []byte("field1"), valueArr: nil},
+                               {name: "intField", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil},
+                               {name: "floatField", valueType: 
pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(1221233.343), valueArr: 
nil},
+                               {name: "binaryField", valueType: 
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+                       },
+               },
+               {}, // empty fields for seriesID 2
+               {
+                       name: "onlyFields", values: []*nameValue{
+                               {name: "intField", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil},
+                       },
+               },
+       },
+}
+
 func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) 
*dataPoints {
        hugeDps := &dataPoints{
                seriesIDs:   []common.SeriesID{},
diff --git a/banyand/observability/meter_prom.go 
b/banyand/observability/meter_prom.go
index 260b442f..ea5d2d8a 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -18,6 +18,7 @@
 package observability
 
 import (
+       "net/http"
        "sync"
 
        grpcprom 
"github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
@@ -42,10 +43,6 @@ var (
 func init() {
        reg.MustRegister(collectors.NewGoCollector())
        
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
-       metricsMux.Handle("/metrics", promhttp.HandlerFor(
-               reg,
-               promhttp.HandlerOpts{},
-       ))
 }
 
 // NewMeterProvider returns a meter.Provider based on the given scope.
@@ -53,6 +50,13 @@ func newPromMeterProvider() meter.Provider {
        return prom.NewProvider(SystemScope, reg)
 }
 
+func registerMetricsEndpoint(metricsMux *http.ServeMux) {
+       metricsMux.Handle("/metrics", promhttp.HandlerFor(
+               reg,
+               promhttp.HandlerOpts{},
+       ))
+}
+
 // MetricsServerInterceptor returns a server interceptor for metrics.
 func promMetricsServerInterceptor() (grpc.UnaryServerInterceptor, 
grpc.StreamServerInterceptor) {
        once.Do(func() {
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index 68bd30e3..4170b09f 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -42,9 +42,8 @@ const (
 )
 
 var (
-       _          run.Service = (*metricService)(nil)
-       _          run.Config  = (*metricService)(nil)
-       metricsMux             = http.NewServeMux()
+       _ run.Service = (*metricService)(nil)
+       _ run.Config  = (*metricService)(nil)
        // MetricsServerInterceptor is the function to obtain grpc metrics 
interceptor.
        MetricsServerInterceptor func() (grpc.UnaryServerInterceptor, 
grpc.StreamServerInterceptor) = emptyMetricsServerInterceptor
 )
@@ -127,7 +126,6 @@ func (p *metricService) PreRun(ctx context.Context) error {
                NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata, 
nodeInfo)
        }
        initMetrics(p.modes)
-       metricsMux.HandleFunc("/_route", p.routeTableHandler)
        return nil
 }
 
@@ -147,6 +145,11 @@ func (p *metricService) Serve() run.StopNotify {
        if err != nil {
                p.l.Fatal().Err(err).Msg("Failed to register metrics collector")
        }
+       metricsMux := http.NewServeMux()
+       metricsMux.HandleFunc("/_route", p.routeTableHandler)
+       if containsMode(p.modes, flagPromethusMode) {
+               registerMetricsEndpoint(metricsMux)
+       }
        if containsMode(p.modes, flagNativeMode) {
                err = p.scheduler.Register("native-metric-collection", 
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
                        NativeMetricCollection.FlushMetrics()
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index bd0db065..e6e7b964 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -19,7 +19,6 @@ package stream
 
 import (
        "sort"
-       "sync"
 
        "golang.org/x/exp/slices"
 
@@ -31,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -351,7 +351,7 @@ func generateBlock() *block {
        if v == nil {
                return &block{}
        }
-       return v.(*block)
+       return v
 }
 
 func releaseBlock(b *block) {
@@ -359,7 +359,7 @@ func releaseBlock(b *block) {
        blockPool.Put(b)
 }
 
-var blockPool sync.Pool
+var blockPool = pool.Register[*block]("stream-block")
 
 type blockCursor struct {
        p                *part
@@ -559,14 +559,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
        return true
 }
 
-var blockCursorPool sync.Pool
+var blockCursorPool = pool.Register[*blockCursor]("stream-blockCursor")
 
 func generateBlockCursor() *blockCursor {
        v := blockCursorPool.Get()
        if v == nil {
                return &blockCursor{}
        }
-       return v.(*blockCursor)
+       return v
 }
 
 func releaseBlockCursor(bc *blockCursor) {
@@ -668,7 +668,7 @@ func generateBlockPointer() *blockPointer {
        if v == nil {
                return &blockPointer{}
        }
-       return v.(*blockPointer)
+       return v
 }
 
 func releaseBlockPointer(bi *blockPointer) {
@@ -676,4 +676,4 @@ func releaseBlockPointer(bi *blockPointer) {
        blockPointerPool.Put(bi)
 }
 
-var blockPointerPool sync.Pool
+var blockPointerPool = pool.Register[*blockPointer]("stream-blockPointer")
diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go
index 9f0091ee..4f410cd5 100644
--- a/banyand/stream/block_metadata.go
+++ b/banyand/stream/block_metadata.go
@@ -21,11 +21,11 @@ import (
        "errors"
        "fmt"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
@@ -175,7 +175,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                        if err != nil {
                                return nil, fmt.Errorf("cannot unmarshal 
tagFamily name: %w", err)
                        }
-                       // TODO: cache dataBlock
                        tf := &dataBlock{}
                        src, err = tf.unmarshal(src)
                        if err != nil {
@@ -202,7 +201,7 @@ func generateBlockMetadata() *blockMetadata {
        if v == nil {
                return &blockMetadata{}
        }
-       return v.(*blockMetadata)
+       return v
 }
 
 func releaseBlockMetadata(bm *blockMetadata) {
@@ -210,7 +209,7 @@ func releaseBlockMetadata(bm *blockMetadata) {
        blockMetadataPool.Put(bm)
 }
 
-var blockMetadataPool sync.Pool
+var blockMetadataPool = pool.Register[*blockMetadata]("stream-blockMetadata")
 
 type blockMetadataArray struct {
        arr []blockMetadata
@@ -223,14 +222,14 @@ func (bma *blockMetadataArray) reset() {
        bma.arr = bma.arr[:0]
 }
 
-var blockMetadataArrayPool sync.Pool
+var blockMetadataArrayPool = 
pool.Register[*blockMetadataArray]("stream-blockMetadataArray")
 
 func generateBlockMetadataArray() *blockMetadataArray {
        v := blockMetadataArrayPool.Get()
        if v == nil {
                return &blockMetadataArray{}
        }
-       return v.(*blockMetadataArray)
+       return v
 }
 
 func releaseBlockMetadataArray(bma *blockMetadataArray) {
diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go
index 60701515..1c8a987c 100644
--- a/banyand/stream/block_reader.go
+++ b/banyand/stream/block_reader.go
@@ -22,11 +22,11 @@ import (
        "errors"
        "fmt"
        "io"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type seqReader struct {
@@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {
 
 func generateSeqReader() *seqReader {
        if v := seqReaderPool.Get(); v != nil {
-               return v.(*seqReader)
+               return v
        }
        return &seqReader{}
 }
@@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
        seqReaderPool.Put(sr)
 }
 
-var seqReaderPool sync.Pool
+var seqReaderPool = pool.Register[*seqReader]("stream-seqReader")
 
 type seqReaders struct {
        tagFamilyMetadata map[string]*seqReader
@@ -216,11 +216,11 @@ func (br *blockReader) error() error {
        return br.err
 }
 
-var blockReaderPool sync.Pool
+var blockReaderPool = pool.Register[*blockReader]("stream-blockReader")
 
 func generateBlockReader() *blockReader {
        if v := blockReaderPool.Get(); v != nil {
-               return v.(*blockReader)
+               return v
        }
        return &blockReader{}
 }
diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go
index 5125bd85..f55f9ebf 100644
--- a/banyand/stream/block_writer.go
+++ b/banyand/stream/block_writer.go
@@ -19,12 +19,12 @@ package stream
 
 import (
        "path/filepath"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type writer struct {
@@ -279,7 +279,7 @@ func generateBlockWriter() *blockWriter {
                        },
                }
        }
-       return v.(*blockWriter)
+       return v
 }
 
 func releaseBlockWriter(bsw *blockWriter) {
@@ -287,4 +287,4 @@ func releaseBlockWriter(bsw *blockWriter) {
        blockWriterPool.Put(bsw)
 }
 
-var blockWriterPool sync.Pool
+var blockWriterPool = pool.Register[*blockWriter]("stream-blockWriter")
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index 76c6e659..072e8b39 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -18,8 +18,7 @@
 package stream
 
 import (
-       "sync"
-
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
@@ -33,14 +32,14 @@ func (i *introduction) reset() {
        i.applied = nil
 }
 
-var introductionPool = sync.Pool{}
+var introductionPool = pool.Register[*introduction]("stream-introduction")
 
 func generateIntroduction() *introduction {
        v := introductionPool.Get()
        if v == nil {
                return &introduction{}
        }
-       intro := v.(*introduction)
+       intro := v
        intro.reset()
        return intro
 }
@@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() {
        i.applied = nil
 }
 
-var flusherIntroductionPool = sync.Pool{}
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("stream-flusher-introduction")
 
 func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
@@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
                        flushed: make(map[uint64]*partWrapper),
                }
        }
-       fi := v.(*flusherIntroduction)
+       fi := v
        fi.reset()
        return fi
 }
@@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() {
        i.creator = 0
 }
 
-var mergerIntroductionPool = sync.Pool{}
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("stream-merger-introduction")
 
 func generateMergerIntroduction() *mergerIntroduction {
        v := mergerIntroductionPool.Get()
        if v == nil {
                return &mergerIntroduction{}
        }
-       mi := v.(*mergerIntroduction)
+       mi := v
        mi.reset()
        return mi
 }
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index b6ad05c7..228e607c 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -22,7 +22,6 @@ import (
        "path"
        "path/filepath"
        "sort"
-       "sync"
        "sync/atomic"
        "time"
 
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
@@ -198,7 +198,7 @@ func generateMemPart() *memPart {
        if v == nil {
                return &memPart{}
        }
-       return v.(*memPart)
+       return v
 }
 
 func releaseMemPart(mp *memPart) {
@@ -206,7 +206,7 @@ func releaseMemPart(mp *memPart) {
        memPartPool.Put(mp)
 }
 
-var memPartPool sync.Pool
+var memPartPool = pool.Register[*memPart]("stream-memPart")
 
 type partWrapper struct {
        mp        *memPart
diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go
index e9c24b98..f4277032 100644
--- a/banyand/stream/part_iter.go
+++ b/banyand/stream/part_iter.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "io"
        "sort"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -30,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type partIter struct {
@@ -324,7 +324,7 @@ func generatePartMergeIter() *partMergeIter {
        if v == nil {
                return &partMergeIter{}
        }
-       return v.(*partMergeIter)
+       return v
 }
 
 func releasePartMergeIter(pmi *partMergeIter) {
@@ -332,7 +332,7 @@ func releasePartMergeIter(pmi *partMergeIter) {
        pmiPool.Put(pmi)
 }
 
-var pmiPool sync.Pool
+var pmiPool = pool.Register[*partMergeIter]("stream-partMergeIter")
 
 type partMergeIterHeap []*partMergeIter
 
@@ -366,7 +366,7 @@ func generateColumnValuesDecoder() 
*encoding.BytesBlockDecoder {
        if v == nil {
                return &encoding.BytesBlockDecoder{}
        }
-       return v.(*encoding.BytesBlockDecoder)
+       return v
 }
 
 func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
@@ -374,4 +374,4 @@ func releaseColumnValuesDecoder(d 
*encoding.BytesBlockDecoder) {
        columnValuesDecoderPool.Put(d)
 }
 
-var columnValuesDecoderPool sync.Pool
+var columnValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("stream-columnValuesDecoder")
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 9dff205f..827c896d 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -190,9 +190,8 @@ func (qr *queryResult) scanParts(ctx context.Context, qo 
queryOptions) error {
        defer releaseBlockMetadataArray(bma)
        defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
        defer defFn()
-       // TODO: cache tstIter
-       var ti tstIter
-       defer ti.reset()
+       ti := generateTstIter()
+       defer releaseTstIter(ti)
        sids := qo.sortedSids
        ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
        if ti.Error() != nil {
@@ -288,6 +287,7 @@ func (qr *queryResult) load(ctx context.Context, qo 
queryOptions) *model.StreamR
                        return blankCursorList[i] > blankCursorList[j]
                })
                for _, index := range blankCursorList {
+                       releaseBlockCursor(qr.data[index])
                        qr.data = append(qr.data[:index], qr.data[index+1:]...)
                }
                qr.loaded = true
@@ -610,6 +610,7 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value 
[]byte) *modelv1.TagValu
        case pbv1.ValueTypeStrArr:
                var values []string
                bb := bigValuePool.Generate()
+               defer bigValuePool.Release(bb)
                var err error
                for len(value) > 0 {
                        bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], 
value)
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index 7f5459c0..b7954810 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -112,7 +112,7 @@ func (t *tag) mustSeqReadValues(decoder 
*encoding.BytesBlockDecoder, reader *seq
        }
 }
 
-var bigValuePool bytes.BufferPool
+var bigValuePool = bytes.NewBufferPool("stream-big-value")
 
 type tagFamily struct {
        name string
diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go
index d044f0ec..d470b6f9 100644
--- a/banyand/stream/tag_metadata.go
+++ b/banyand/stream/tag_metadata.go
@@ -19,11 +19,11 @@ package stream
 
 import (
        "fmt"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 type tagMetadata struct {
@@ -131,7 +131,7 @@ func generateTagFamilyMetadata() *tagFamilyMetadata {
        if v == nil {
                return &tagFamilyMetadata{}
        }
-       return v.(*tagFamilyMetadata)
+       return v
 }
 
 func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) {
@@ -139,4 +139,4 @@ func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) {
        tagFamilyMetadataPool.Put(tfm)
 }
 
-var tagFamilyMetadataPool sync.Pool
+var tagFamilyMetadataPool = 
pool.Register[*tagFamilyMetadata]("stream-tagFamilyMetadata")
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index dff93b39..ea329ff1 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -34,6 +34,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -388,6 +389,21 @@ func (ti *tstIter) Error() error {
        return ti.err
 }
 
+func generateTstIter() *tstIter {
+       v := tstIterPool.Get()
+       if v == nil {
+               return &tstIter{}
+       }
+       return v
+}
+
+func releaseTstIter(ti *tstIter) {
+       ti.reset()
+       tstIterPool.Put(ti)
+}
+
+var tstIterPool = pool.Register[*tstIter]("stream-tstIter")
+
 type partIterHeap []*partIter
 
 func (pih *partIterHeap) Len() int {
diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go
index 595ce54c..2086e147 100644
--- a/pkg/bytes/buffer.go
+++ b/pkg/bytes/buffer.go
@@ -21,9 +21,9 @@ package bytes
 import (
        "fmt"
        "io"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 var (
@@ -107,9 +107,15 @@ func (r *reader) Close() error {
        return nil
 }
 
+func NewBufferPool(name string) *BufferPool {
+       return &BufferPool{
+               p: pool.Register[*Buffer](name),
+       }
+}
+
 // BufferPool is a pool of Buffer.
 type BufferPool struct {
-       p sync.Pool
+       p *pool.Synced[*Buffer]
 }
 
 // Generate generates a Buffer.
@@ -118,7 +124,7 @@ func (bp *BufferPool) Generate() *Buffer {
        if bbv == nil {
                return &Buffer{}
        }
-       return bbv.(*Buffer)
+       return bbv
 }
 
 // Release releases a Buffer.
diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go
index 6aab3f89..25ece6fa 100644
--- a/pkg/encoding/bytes.go
+++ b/pkg/encoding/bytes.go
@@ -297,4 +297,4 @@ func decompressBlock(dst, src []byte) ([]byte, []byte, 
error) {
        }
 }
 
-var bbPool bytes.BufferPool
+var bbPool = bytes.NewBufferPool("encoding.bytesBlock")
diff --git a/pkg/encoding/encoder.go b/pkg/encoding/encoder.go
deleted file mode 100644
index 78add92c..00000000
--- a/pkg/encoding/encoder.go
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
-       "bytes"
-       "encoding/binary"
-       "io"
-       "sync"
-       "time"
-
-       "github.com/pkg/errors"
-
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-var (
-       encoderPool = sync.Pool{
-               New: newEncoder,
-       }
-       decoderPool = sync.Pool{
-               New: func() interface{} {
-                       return &decoder{}
-               },
-       }
-
-       errInvalidValue = errors.New("invalid encoded value")
-       errNoData       = errors.New("there is no data")
-)
-
-type encoderPoolDelegator struct {
-       pool *sync.Pool
-       fn   ParseInterval
-       name string
-       size int
-}
-
-// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor 
encoders.
-func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool 
{
-       return &encoderPoolDelegator{
-               name: name,
-               pool: &encoderPool,
-               size: size,
-               fn:   fn,
-       }
-}
-
-func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) 
SeriesEncoder {
-       encoder := b.pool.Get().(*encoder)
-       encoder.name = b.name
-       encoder.size = b.size
-       encoder.fn = b.fn
-       encoder.Reset(metadata, buffer)
-       return encoder
-}
-
-func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) {
-       _, ok := seriesEncoder.(*encoder)
-       if ok {
-               b.pool.Put(seriesEncoder)
-       }
-}
-
-type decoderPoolDelegator struct {
-       pool *sync.Pool
-       fn   ParseInterval
-       name string
-       size int
-}
-
-// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor 
decoders.
-func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool 
{
-       return &decoderPoolDelegator{
-               name: name,
-               pool: &decoderPool,
-               size: size,
-               fn:   fn,
-       }
-}
-
-func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder {
-       decoder := b.pool.Get().(*decoder)
-       decoder.name = b.name
-       decoder.size = b.size
-       decoder.fn = b.fn
-       return decoder
-}
-
-func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) {
-       _, ok := seriesDecoder.(*decoder)
-       if ok {
-               b.pool.Put(seriesDecoder)
-       }
-}
-
-var _ SeriesEncoder = (*encoder)(nil)
-
-// ParseInterval parses the interval rule from the key in a kv pair.
-type ParseInterval = func(key []byte) time.Duration
-
-type encoder struct {
-       buff      BufferWriter
-       bw        *Writer
-       values    *XOREncoder
-       fn        ParseInterval
-       name      string
-       interval  time.Duration
-       startTime uint64
-       prevTime  uint64
-       num       int
-       size      int
-}
-
-func newEncoder() interface{} {
-       bw := NewWriter()
-       return &encoder{
-               bw:     bw,
-               values: NewXOREncoder(bw),
-       }
-}
-
-func (ie *encoder) Append(ts uint64, value []byte) {
-       if len(value) > 8 {
-               return
-       }
-       if ie.startTime == 0 {
-               ie.startTime = ts
-               ie.prevTime = ts
-       } else if ie.startTime > ts {
-               ie.startTime = ts
-       }
-       gap := int(ie.prevTime) - int(ts)
-       if gap < 0 {
-               return
-       }
-       zeroNum := gap/int(ie.interval) - 1
-       for i := 0; i < zeroNum; i++ {
-               ie.bw.WriteBool(false)
-               ie.num++
-       }
-       ie.prevTime = ts
-       l := len(value)
-       ie.bw.WriteBool(l > 0)
-       ie.values.Write(convert.BytesToUint64(value))
-       ie.num++
-}
-
-func (ie *encoder) IsFull() bool {
-       return ie.num >= ie.size
-}
-
-func (ie *encoder) Reset(key []byte, buffer BufferWriter) {
-       ie.buff = buffer
-       ie.bw.Reset(buffer)
-       ie.interval = ie.fn(key)
-       ie.startTime = 0
-       ie.prevTime = 0
-       ie.num = 0
-       ie.values = NewXOREncoder(ie.bw)
-}
-
-func (ie *encoder) Encode() error {
-       ie.bw.Flush()
-       buffWriter := NewPacker(ie.buff)
-       buffWriter.PutUint64(ie.startTime)
-       buffWriter.PutUint16(uint16(ie.num))
-       return nil
-}
-
-func (ie *encoder) StartTime() uint64 {
-       return ie.startTime
-}
-
-var _ SeriesDecoder = (*decoder)(nil)
-
-type decoder struct {
-       fn        ParseInterval
-       name      string
-       area      []byte
-       size      int
-       interval  time.Duration
-       startTime uint64
-       num       int
-}
-
-func (i *decoder) Decode(key, data []byte) error {
-       if len(data) < 10 {
-               return errInvalidValue
-       }
-       i.interval = i.fn(key)
-       i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : 
len(data)-2])
-       i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
-       i.area = data[:len(data)-10]
-       return nil
-}
-
-func (i decoder) Len() int {
-       return i.num
-}
-
-func (i decoder) IsFull() bool {
-       return i.num >= i.size
-}
-
-func (i decoder) Get(ts uint64) ([]byte, error) {
-       for iter := i.Iterator(); iter.Next(); {
-               if iter.Time() == ts {
-                       return iter.Val(), nil
-               }
-       }
-       return nil, errors.WithMessagef(errNoData, "ts:%d", ts)
-}
-
-func (i decoder) Range() (start, end uint64) {
-       return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
-}
-
-func (i decoder) Iterator() SeriesIterator {
-       br := NewReader(bytes.NewReader(i.area))
-       return &intIterator{
-               endTime:  i.startTime + uint64(i.num*int(i.interval)),
-               interval: int(i.interval),
-               br:       br,
-               values:   NewXORDecoder(br),
-               size:     i.num,
-       }
-}
-
-var _ SeriesIterator = (*intIterator)(nil)
-
-type intIterator struct {
-       err      error
-       br       *Reader
-       values   *XORDecoder
-       endTime  uint64
-       interval int
-       size     int
-       currVal  uint64
-       currTime uint64
-       index    int
-}
-
-func (i *intIterator) Next() bool {
-       if i.index >= i.size {
-               return false
-       }
-       var b bool
-       var err error
-       for !b {
-               b, err = i.br.ReadBool()
-               if errors.Is(err, io.EOF) {
-                       return false
-               }
-               if err != nil {
-                       i.err = err
-                       return false
-               }
-               i.index++
-               i.currTime = i.endTime - uint64(i.interval*i.index)
-       }
-       if i.values.Next() {
-               i.currVal = i.values.Value()
-       }
-       return true
-}
-
-func (i *intIterator) Val() []byte {
-       return convert.Uint64ToBytes(i.currVal)
-}
-
-func (i *intIterator) Time() uint64 {
-       return i.currTime
-}
-
-func (i *intIterator) Error() error {
-       return i.err
-}
diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go
deleted file mode 100644
index 840a883f..00000000
--- a/pkg/encoding/encoder_test.go
+++ /dev/null
@@ -1,392 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
-       "bytes"
-       "testing"
-       "time"
-
-       "github.com/stretchr/testify/assert"
-
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-func TestNewEncoderAndDecoder(t *testing.T) {
-       type tsData struct {
-               ts    []uint64
-               data  []any
-               start uint64
-               end   uint64
-       }
-       tests := []struct {
-               name string
-               args tsData
-               want tsData
-       }{
-               {
-                       name: "int golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7, 8, 7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9, 6},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7, 8, 7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{7, 8, 7},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{7, 8, 7},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "int empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{7, 9},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(1 * time.Minute)},
-                               data:  []any{7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7.0, 8.0, 7.0, 9.0},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7.0, 8.0, 7.0, 9.0},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{0.7, 0.8, 0.7, 0.9, 0.6},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{0.7, 0.8, 0.7, 0.9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{1.7, 1.8, 1.7},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{1.7, 1.8, 1.7},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "float64 empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{0.700033, 0.988822},
-                       },
-                       want: tsData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(1 * time.Minute)},
-                               data:  []any{0.700033, 0.988822},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-       }
-       key := []byte("foo")
-       fn := func(k []byte) time.Duration {
-               assert.Equal(t, key, k)
-               return 1 * time.Minute
-       }
-       encoderPool := NewEncoderPool("minute", 4, fn)
-       decoderPool := NewDecoderPool("minute", 4, fn)
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       at := assert.New(t)
-                       var buffer bytes.Buffer
-                       encoder := encoderPool.Get(key, &buffer)
-                       defer encoderPool.Put(encoder)
-                       decoder := decoderPool.Get(key)
-                       defer decoderPool.Put(decoder)
-                       isFull := false
-                       for i, v := range tt.args.ts {
-                               encoder.Append(v, ToBytes(tt.args.data[i]))
-                               if encoder.IsFull() {
-                                       isFull = true
-                                       break
-                               }
-                       }
-                       err := encoder.Encode()
-                       at.NoError(err)
-
-                       at.Equal(tt.want.start, encoder.StartTime())
-                       at.NoError(decoder.Decode(key, buffer.Bytes()))
-                       start, end := decoder.Range()
-                       at.Equal(tt.want.start, start)
-                       at.Equal(tt.want.end, end)
-                       if isFull {
-                               at.True(decoder.IsFull())
-                       }
-                       i := 0
-                       for iter := decoder.Iterator(); iter.Next(); i++ {
-                               at.NoError(iter.Error())
-                               at.Equal(tt.want.ts[i], iter.Time())
-                               at.Equal(tt.want.data[i], 
BytesTo(tt.want.data[i], iter.Val()))
-                               v, err := decoder.Get(iter.Time())
-                               at.NoError(err)
-                               at.Equal(tt.want.data[i], 
BytesTo(tt.want.data[i], v))
-                       }
-                       at.Equal(len(tt.want.ts), i)
-               })
-       }
-}
-
-func ToBytes(v any) []byte {
-       switch d := v.(type) {
-       case int:
-               return convert.Int64ToBytes(int64(d))
-       case float64:
-               return convert.Float64ToBytes(d)
-       }
-       return nil
-}
-
-func BytesTo(t any, b []byte) any {
-       switch t.(type) {
-       case int:
-               return int(convert.BytesToInt64(b))
-       case float64:
-               return convert.BytesToFloat64(b)
-       }
-       return nil
-}
-
-func TestNewDecoderGet(t *testing.T) {
-       type tsData struct {
-               ts   []uint64
-               data []any
-       }
-       type wantData struct {
-               ts      []uint64
-               data    []any
-               wantErr []bool
-               start   uint64
-               end     uint64
-       }
-       tests := []struct {
-               name string
-               args tsData
-               want wantData
-       }{
-               {
-                       name: "int golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7, 8, 7, 9},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{7, 8, 7, 9, 6},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
-                               data:    []any{7, 8, 7, 9, nil},
-                               wantErr: []bool{false, false, false, false, 
true},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "int less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{7, 8, 7},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{7, 8, 7},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "int empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{7, 9},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:    []any{7, nil, nil, 9},
-                               wantErr: []bool{false, true, true, false},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float golden path",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data: []any{7.0, 8.0, 7.0, 9.0},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:  []any{7.0, 8.0, 7.0, 9.0},
-                               start: uint64(time.Minute),
-                               end:   uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float more than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 
uint64(1 * time.Minute)},
-                               data: []any{1.7, 1.8, 1.7, 1.9, 1.6},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
-                               data:    []any{1.7, 1.8, 1.7, 1.9, nil},
-                               wantErr: []bool{false, false, false, false, 
true},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-               {
-                       name: "float less than the size",
-                       args: tsData{
-                               ts:   []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data: []any{0.71, 0.833, 0.709},
-                       },
-                       want: wantData{
-                               ts:    []uint64{uint64(3 * time.Minute), 
uint64(2 * time.Minute), uint64(time.Minute)},
-                               data:  []any{0.71, 0.833, 0.709},
-                               start: uint64(time.Minute),
-                               end:   uint64(3 * time.Minute),
-                       },
-               },
-               {
-                       name: "float empty slot in the middle",
-                       args: tsData{
-                               ts:   []uint64{uint64(4 * time.Minute), 
uint64(time.Minute)},
-                               data: []any{1.7, 1.9},
-                       },
-                       want: wantData{
-                               ts:      []uint64{uint64(4 * time.Minute), 
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
-                               data:    []any{1.7, nil, nil, 1.9},
-                               wantErr: []bool{false, true, true, false},
-                               start:   uint64(time.Minute),
-                               end:     uint64(4 * time.Minute),
-                       },
-               },
-       }
-       key := []byte("foo")
-       fn := func(k []byte) time.Duration {
-               assert.Equal(t, key, k)
-               return 1 * time.Minute
-       }
-       encoderPool := NewEncoderPool("minute", 4, fn)
-       decoderPool := NewDecoderPool("minute", 4, fn)
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       at := assert.New(t)
-                       var buffer bytes.Buffer
-                       encoder := encoderPool.Get(key, &buffer)
-                       defer encoderPool.Put(encoder)
-                       decoder := decoderPool.Get(key)
-                       defer decoderPool.Put(decoder)
-                       isFull := false
-                       for i, v := range tt.args.ts {
-                               encoder.Append(v, ToBytes(tt.args.data[i]))
-                               if encoder.IsFull() {
-                                       isFull = true
-                                       break
-                               }
-                       }
-                       err := encoder.Encode()
-                       at.NoError(err)
-
-                       at.Equal(tt.want.start, encoder.StartTime())
-                       at.NoError(decoder.Decode(key, buffer.Bytes()))
-                       start, end := decoder.Range()
-                       at.Equal(tt.want.start, start)
-                       at.Equal(tt.want.end, end)
-                       if isFull {
-                               at.True(decoder.IsFull())
-                       }
-                       for i, t := range tt.want.ts {
-                               wantErr := false
-                               if tt.want.wantErr != nil {
-                                       wantErr = tt.want.wantErr[i]
-                               }
-                               v, err := decoder.Get(t)
-                               if wantErr {
-                                       at.ErrorIs(err, errNoData)
-                               } else {
-                                       at.NoError(err)
-                                       at.Equal(tt.want.data[i], 
BytesTo(tt.want.data[i], v))
-                               }
-                       }
-               })
-       }
-}
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index 21b77482..05a227ab 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,7 +20,8 @@ package encoding
 import (
        "encoding/binary"
        "fmt"
-       "sync"
+
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 // Uint16ToBytes appends the bytes of the given uint16 to the given byte slice.
@@ -224,7 +225,7 @@ func GenerateInt64List(size int) *Int64List {
                        L: make([]int64, size),
                }
        }
-       is := v.(*Int64List)
+       is := v
        if n := size - cap(is.L); n > 0 {
                is.L = append(is.L[:cap(is.L)], make([]int64, n)...)
        }
@@ -243,7 +244,7 @@ type Int64List struct {
        L []int64
 }
 
-var int64ListPool sync.Pool
+var int64ListPool = pool.Register[*Int64List]("encoding-int64List")
 
 // GenerateUint64List generates a list of uint64 with the given size.
 // The returned list may be from a pool and should be released after use.
@@ -254,7 +255,7 @@ func GenerateUint64List(size int) *Uint64List {
                        L: make([]uint64, size),
                }
        }
-       is := v.(*Uint64List)
+       is := v
        if n := size - cap(is.L); n > 0 {
                is.L = append(is.L[:cap(is.L)], make([]uint64, n)...)
        }
@@ -273,4 +274,4 @@ type Uint64List struct {
        L []uint64
 }
 
-var uint64ListPool sync.Pool
+var uint64ListPool = pool.Register[*Uint64List]("encoding-uin64List")
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index ff7dac5e..39fcb22a 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -25,12 +25,12 @@ import (
        "io"
        "os"
        "path/filepath"
-       "sync"
        "time"
 
        "github.com/shirou/gopsutil/v3/disk"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const defaultIOSize = 256 * 1024
@@ -465,7 +465,7 @@ func generateReader(f *os.File) *bufio.Reader {
        if v == nil {
                return bufio.NewReaderSize(f, defaultIOSize)
        }
-       br := v.(*bufio.Reader)
+       br := v
        br.Reset(f)
        return br
 }
@@ -475,14 +475,14 @@ func releaseReader(br *bufio.Reader) {
        bufReaderPool.Put(br)
 }
 
-var bufReaderPool sync.Pool
+var bufReaderPool = pool.Register[*bufio.Reader]("fs-bufReader")
 
 func generateWriter(f *os.File) *bufio.Writer {
        v := bufWriterPool.Get()
        if v == nil {
                return bufio.NewWriterSize(f, defaultIOSize)
        }
-       bw := v.(*bufio.Writer)
+       bw := v
        bw.Reset(f)
        return bw
 }
@@ -492,4 +492,4 @@ func releaseWriter(bw *bufio.Writer) {
        bufWriterPool.Put(bw)
 }
 
-var bufWriterPool sync.Pool
+var bufWriterPool = pool.Register[*bufio.Writer]("fs-bufWriter")
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4d831fb2..0da32e02 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -25,7 +25,6 @@ import (
        "io"
        "log"
        "math"
-       "sync"
        "time"
 
        "github.com/blugelabs/bluge"
@@ -43,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -87,14 +87,14 @@ type store struct {
        l      *logger.Logger
 }
 
-var batchPool sync.Pool
+var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch")
 
 func generateBatch() *blugeIndex.Batch {
        b := batchPool.Get()
        if b == nil {
                return bluge.NewBatch()
        }
-       return b.(*blugeIndex.Batch)
+       return b
 }
 
 func releaseBatch(b *blugeIndex.Batch) {
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index 754d155e..a0e76827 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -19,7 +19,6 @@ package v1
 
 import (
        "sort"
-       "sync"
 
        "github.com/pkg/errors"
 
@@ -97,26 +96,6 @@ func (s *Series) reset() {
        s.Buffer = s.Buffer[:0]
 }
 
-// SeriesPool is a pool of Series.
-type SeriesPool struct {
-       pool sync.Pool
-}
-
-// Generate creates a new Series or gets one from the pool.
-func (sp *SeriesPool) Generate() *Series {
-       sv := sp.pool.Get()
-       if sv == nil {
-               return &Series{}
-       }
-       return sv.(*Series)
-}
-
-// Release puts a Series back to the pool.
-func (sp *SeriesPool) Release(s *Series) {
-       s.reset()
-       sp.pool.Put(s)
-}
-
 // SeriesList is a collection of Series.
 type SeriesList []*Series
 
diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go
new file mode 100644
index 00000000..8f44c8a2
--- /dev/null
+++ b/pkg/pool/pool.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pool
+
+import (
+       "fmt"
+       "sync"
+       "sync/atomic"
+)
+
+var poolMap = sync.Map{}
+
+func Register[T any](name string) *Synced[T] {
+       p := new(Synced[T])
+       if _, ok := poolMap.LoadOrStore(name, p); ok {
+               panic(fmt.Sprintf("duplicated pool: %s", name))
+       }
+       return p
+}
+
+func AllRefsCount() map[string]int {
+       result := make(map[string]int)
+       poolMap.Range(func(key, value any) bool {
+               result[key.(string)] = value.(Trackable).RefsCount()
+               return true
+       })
+       return result
+}
+
+type Trackable interface {
+       RefsCount() int
+}
+
+type Synced[T any] struct {
+       sync.Pool
+       refs atomic.Int32
+}
+
+func (p *Synced[T]) Get() T {
+       v := p.Pool.Get()
+       p.refs.Add(1)
+       if v == nil {
+               var t T
+               return t
+       }
+       return v.(T)
+}
+
+func (p *Synced[T]) Put(v T) {
+       p.Pool.Put(v)
+       p.refs.Add(-1)
+}
+
+func (p *Synced[T]) RefsCount() int {
+       return int(p.refs.Load())
+}
diff --git a/pkg/test/gmatcher/gmatcher.go b/pkg/test/gmatcher/gmatcher.go
new file mode 100644
index 00000000..ae761529
--- /dev/null
+++ b/pkg/test/gmatcher/gmatcher.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gmatcher
+
+import (
+       "fmt"
+
+       "github.com/onsi/gomega"
+)
+
+// HaveZeroRef returns a matcher that checks if all pools have 0 references.
+func HaveZeroRef() gomega.OmegaMatcher {
+       return &ZeroRefMatcher{}
+}
+
+var _ gomega.OmegaMatcher = &ZeroRefMatcher{}
+
+type ZeroRefMatcher struct{}
+
+// FailureMessage implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) FailureMessage(actual interface{}) (message string) {
+       return fmt.Sprintf("expected all pools to have 0 references, got %v", 
actual)
+}
+
+// Match implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) Match(actual interface{}) (success bool, err error) {
+       data, ok := actual.(map[string]int)
+       if !ok {
+               return false, fmt.Errorf("expected map[string]int, got %T", 
actual)
+       }
+       for pooName, refers := range data {
+               if refers > 0 {
+                       return false, fmt.Errorf("pool %s has %d references", 
pooName, refers)
+               }
+       }
+       return true, nil
+}
+
+// NegatedFailureMessage implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) NegatedFailureMessage(actual interface{}) (message 
string) {
+       return fmt.Sprintf("expected at least one pool to have references, got 
%v", actual)
+}
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index f50dc4e1..3077c510 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -108,14 +108,12 @@ func loadData(md *commonv1.Metadata, measure 
measurev1.MeasureService_WriteClien
        content, err := dataFS.ReadFile("testdata/" + dataFile)
        gm.Expect(err).ShouldNot(gm.HaveOccurred())
        gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
-       nano := baseTime.UnixNano()
        for i, template := range templates {
                rawDataPointValue, errMarshal := json.Marshal(template)
                gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
                dataPointValue := &measurev1.DataPointValue{}
                gm.Expect(protojson.Unmarshal(rawDataPointValue, 
dataPointValue)).ShouldNot(gm.HaveOccurred())
                dataPointValue.Timestamp = 
timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
-               dataPointValue.Version = nano + int64(i)
                gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, 
DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})).
                        Should(gm.Succeed())
        }
diff --git a/test/integration/distributed/query/query_suite_test.go 
b/test/integration/distributed/query/query_suite_test.go
index 0c78a14f..3d8eaad3 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -35,8 +35,10 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
@@ -132,4 +134,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git a/test/integration/etcd/client_test.go 
b/test/integration/etcd/client_test.go
index a78d8345..ddfb4797 100644
--- a/test/integration/etcd/client_test.go
+++ b/test/integration/etcd/client_test.go
@@ -36,8 +36,10 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
 )
@@ -76,6 +78,7 @@ var _ = Describe("Client Test", func() {
        AfterEach(func() {
                dirSpaceDef()
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
 
        It("should be using user/password connect etcd server successfully", 
func() {
diff --git a/test/integration/load/load_suite_test.go 
b/test/integration/load/load_suite_test.go
index 1858e380..84dccc38 100644
--- a/test/integration/load/load_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -36,8 +36,10 @@ import (
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
@@ -157,6 +159,7 @@ var _ = Describe("Load Test Suit", func() {
                }
                deferFunc()
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
 })
 
diff --git a/test/integration/standalone/cold_query/query_suite_test.go 
b/test/integration/standalone/cold_query/query_suite_test.go
index 18e832ad..a28c145e 100644
--- a/test/integration/standalone/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -29,7 +29,9 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -89,4 +91,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git a/test/integration/standalone/other/measure_test.go 
b/test/integration/standalone/other/measure_test.go
index a2ab6e81..a0ced9ac 100644
--- a/test/integration/standalone/other/measure_test.go
+++ b/test/integration/standalone/other/measure_test.go
@@ -27,7 +27,9 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -57,6 +59,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
                gm.Expect(conn.Close()).To(gm.Succeed())
                deferFn()
                gm.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               gm.Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
        g.It("queries service_cpm_minute by id after updating", func() {
                casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data1.json", baseTime, interval)
diff --git a/test/integration/standalone/other/property_test.go 
b/test/integration/standalone/other/property_test.go
index ab217059..164b3ce3 100644
--- a/test/integration/standalone/other/property_test.go
+++ b/test/integration/standalone/other/property_test.go
@@ -32,7 +32,9 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
 )
 
@@ -57,6 +59,7 @@ var _ = Describe("Property application", func() {
                Expect(conn.Close()).To(Succeed())
                deferFn()
                Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
        })
        It("applies properties", func() {
                md := &propertyv1.Metadata{
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
index a1e684b2..765cd893 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -29,7 +29,9 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -90,4 +92,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })
diff --git 
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go 
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index f6fcb14c..346b8918 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -29,8 +29,10 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -103,4 +105,5 @@ var _ = SynchronizedAfterSuite(func() {
 }, func() {
        deferFunc()
        Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
 })

Reply via email to