This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b4fb8ff3 Improve block metadata reading performance (#407)
b4fb8ff3 is described below

commit b4fb8ff36ef6650ada8846c04a62735cdaa52391
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Mar 10 13:49:58 2024 +0800

    Improve block metadata reading performance (#407)
---
 banyand/measure/block_metadata.go | 185 +++++++++++++++++++++-----------------
 banyand/measure/part.go           |  10 +--
 banyand/measure/part_iter.go      |  14 +--
 banyand/measure/part_iter_test.go |   4 +-
 banyand/measure/query.go          |   5 +-
 banyand/measure/query_test.go     |   4 +-
 banyand/measure/tstable.go        |   4 +-
 banyand/measure/tstable_test.go   |   4 +-
 banyand/stream/block_metadata.go  | 145 +++++++++++++++++-------------
 banyand/stream/part.go            |   4 +-
 banyand/stream/part_iter.go       |  14 +--
 banyand/stream/part_iter_test.go  |   5 +-
 banyand/stream/query.go           |   5 +-
 banyand/stream/query_test.go      |   5 +-
 banyand/stream/tstable.go         |   4 +-
 banyand/stream/tstable_test.go    |   4 +-
 16 files changed, 234 insertions(+), 182 deletions(-)

diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index 6202c9eb..91584760 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -34,34 +34,34 @@ type dataBlock struct {
        size   uint64
 }
 
-func (h *dataBlock) reset() {
-       h.offset = 0
-       h.size = 0
+func (b *dataBlock) reset() {
+       b.offset = 0
+       b.size = 0
 }
 
-func (h *dataBlock) copyFrom(src *dataBlock) {
-       h.offset = src.offset
-       h.size = src.size
+func (b *dataBlock) copyFrom(src *dataBlock) {
+       b.offset = src.offset
+       b.size = src.size
 }
 
-func (h *dataBlock) marshal(dst []byte) []byte {
-       dst = encoding.VarUint64ToBytes(dst, h.offset)
-       dst = encoding.VarUint64ToBytes(dst, h.size)
+func (b *dataBlock) marshal(dst []byte) []byte {
+       dst = encoding.VarUint64ToBytes(dst, b.offset)
+       dst = encoding.VarUint64ToBytes(dst, b.size)
        return dst
 }
 
-func (h *dataBlock) unmarshal(src []byte) ([]byte, error) {
+func (b *dataBlock) unmarshal(src []byte) ([]byte, error) {
        src, n, err := encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal offset: %w", err)
        }
-       h.offset = n
+       b.offset = n
 
        src, n, err = encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal size: %w", err)
        }
-       h.size = n
+       b.size = n
        return src, nil
 }
 
@@ -75,84 +75,84 @@ type blockMetadata struct {
        count                 uint64
 }
 
-func (bh *blockMetadata) copyFrom(src *blockMetadata) {
-       bh.seriesID = src.seriesID
-       bh.uncompressedSizeBytes = src.uncompressedSizeBytes
-       bh.count = src.count
-       bh.timestamps.copyFrom(&src.timestamps)
+func (bm *blockMetadata) copyFrom(src *blockMetadata) {
+       bm.seriesID = src.seriesID
+       bm.uncompressedSizeBytes = src.uncompressedSizeBytes
+       bm.count = src.count
+       bm.timestamps.copyFrom(&src.timestamps)
        for k, db := range src.tagFamilies {
-               if bh.tagFamilies == nil {
-                       bh.tagFamilies = make(map[string]*dataBlock)
+               if bm.tagFamilies == nil {
+                       bm.tagFamilies = make(map[string]*dataBlock)
                }
-               bh.tagFamilies[k] = &dataBlock{}
-               bh.tagFamilies[k].copyFrom(db)
+               bm.tagFamilies[k] = &dataBlock{}
+               bm.tagFamilies[k].copyFrom(db)
        }
-       bh.field.copyFrom(&src.field)
+       bm.field.copyFrom(&src.field)
 }
 
-func (bh *blockMetadata) getTagFamilyMetadata(name string) *dataBlock {
-       if bh.tagFamilies == nil {
-               bh.tagFamilies = make(map[string]*dataBlock)
+func (bm *blockMetadata) getTagFamilyMetadata(name string) *dataBlock {
+       if bm.tagFamilies == nil {
+               bm.tagFamilies = make(map[string]*dataBlock)
        }
-       tf, ok := bh.tagFamilies[name]
+       tf, ok := bm.tagFamilies[name]
        if !ok {
                tf = &dataBlock{}
-               bh.tagFamilies[name] = tf
+               bm.tagFamilies[name] = tf
        }
        return tf
 }
 
-func (bh *blockMetadata) reset() {
-       bh.seriesID = 0
-       bh.uncompressedSizeBytes = 0
-       bh.count = 0
-       bh.timestamps.reset()
-       bh.field.reset()
-       for k := range bh.tagFamilies {
-               bh.tagFamilies[k].reset()
-               delete(bh.tagFamilies, k)
+func (bm *blockMetadata) reset() {
+       bm.seriesID = 0
+       bm.uncompressedSizeBytes = 0
+       bm.count = 0
+       bm.timestamps.reset()
+       bm.field.reset()
+       for k := range bm.tagFamilies {
+               bm.tagFamilies[k].reset()
+               delete(bm.tagFamilies, k)
        }
-       bh.tagProjection = bh.tagProjection[:0]
+       bm.tagProjection = bm.tagProjection[:0]
 }
 
-func (bh *blockMetadata) marshal(dst []byte) []byte {
-       dst = bh.seriesID.AppendToBytes(dst)
-       dst = encoding.VarUint64ToBytes(dst, bh.uncompressedSizeBytes)
-       dst = encoding.VarUint64ToBytes(dst, bh.count)
-       dst = bh.timestamps.marshal(dst)
-       dst = encoding.VarUint64ToBytes(dst, uint64(len(bh.tagFamilies)))
+func (bm *blockMetadata) marshal(dst []byte) []byte {
+       dst = bm.seriesID.AppendToBytes(dst)
+       dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSizeBytes)
+       dst = encoding.VarUint64ToBytes(dst, bm.count)
+       dst = bm.timestamps.marshal(dst)
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(bm.tagFamilies)))
        // make sure the order of tagFamilies is stable
-       keys := make([]string, 0, len(bh.tagFamilies))
-       for k := range bh.tagFamilies {
+       keys := make([]string, 0, len(bm.tagFamilies))
+       for k := range bm.tagFamilies {
                keys = append(keys, k)
        }
        sort.Strings(keys)
        for _, name := range keys {
-               cf := bh.tagFamilies[name]
+               cf := bm.tagFamilies[name]
                dst = encoding.EncodeBytes(dst, convert.StringToBytes(name))
                dst = cf.marshal(dst)
        }
-       return bh.field.marshal(dst)
+       return bm.field.marshal(dst)
 }
 
-func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) {
+func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) {
        if len(src) < 8 {
                return nil, errors.New("cannot unmarshal blockMetadata from 
less than 8 bytes")
        }
-       bh.seriesID = common.SeriesID(encoding.BytesToUint64(src))
+       bm.seriesID = common.SeriesID(encoding.BytesToUint64(src))
        src = src[8:]
        src, n, err := encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: 
%w", err)
        }
-       bh.uncompressedSizeBytes = n
+       bm.uncompressedSizeBytes = n
 
        src, n, err = encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal count: %w", err)
        }
-       bh.count = n
-       src, err = bh.timestamps.unmarshal(src)
+       bm.count = n
+       src, err = bm.timestamps.unmarshal(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: 
%w", err)
        }
@@ -161,8 +161,8 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                return nil, fmt.Errorf("cannot unmarshal tagFamilies count: 
%w", err)
        }
        if n > 0 {
-               if bh.tagFamilies == nil {
-                       bh.tagFamilies = make(map[string]*dataBlock, n)
+               if bm.tagFamilies == nil {
+                       bm.tagFamilies = make(map[string]*dataBlock, n)
                }
                var nameBytes []byte
                for i := uint64(0); i < n; i++ {
@@ -176,21 +176,21 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                        if err != nil {
                                return nil, fmt.Errorf("cannot unmarshal 
tagFamily dataBlock: %w", err)
                        }
-                       bh.tagFamilies[convert.BytesToString(nameBytes)] = tf
+                       bm.tagFamilies[convert.BytesToString(nameBytes)] = tf
                }
        }
-       src, err = bh.field.unmarshal(src)
+       src, err = bm.field.unmarshal(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal columnFamilyMetadata: 
%w", err)
        }
        return src, nil
 }
 
-func (bh blockMetadata) less(other blockMetadata) bool {
-       if bh.seriesID == other.seriesID {
-               return bh.timestamps.min < other.timestamps.min
+func (bm blockMetadata) less(other blockMetadata) bool {
+       if bm.seriesID == other.seriesID {
+               return bm.timestamps.min < other.timestamps.min
        }
-       return bh.seriesID < other.seriesID
+       return bm.seriesID < other.seriesID
 }
 
 func generateBlockMetadata() *blockMetadata {
@@ -201,13 +201,32 @@ func generateBlockMetadata() *blockMetadata {
        return v.(*blockMetadata)
 }
 
-func releaseBlockMetadata(bh *blockMetadata) {
-       bh.reset()
-       blockMetadataPool.Put(bh)
+func releaseBlockMetadata(bm *blockMetadata) {
+       bm.reset()
+       blockMetadataPool.Put(bm)
 }
 
 var blockMetadataPool sync.Pool
 
+type blockMetadataArray struct {
+       arr []blockMetadata
+}
+
+var blockMetadataArrayPool sync.Pool
+
+func generateBlockMetadataArray() *blockMetadataArray {
+       v := blockMetadataArrayPool.Get()
+       if v == nil {
+               return &blockMetadataArray{}
+       }
+       return v.(*blockMetadataArray)
+}
+
+func releaseBlockMetadataArray(bma *blockMetadataArray) {
+       bma.arr = bma.arr[:0]
+       blockMetadataArrayPool.Put(bma)
+}
+
 type timestampsMetadata struct {
        dataBlock
        min        int64
@@ -215,38 +234,38 @@ type timestampsMetadata struct {
        encodeType encoding.EncodeType
 }
 
-func (th *timestampsMetadata) reset() {
-       th.dataBlock.reset()
-       th.min = 0
-       th.max = 0
-       th.encodeType = 0
+func (tm *timestampsMetadata) reset() {
+       tm.dataBlock.reset()
+       tm.min = 0
+       tm.max = 0
+       tm.encodeType = 0
 }
 
-func (th *timestampsMetadata) copyFrom(src *timestampsMetadata) {
-       th.dataBlock.copyFrom(&src.dataBlock)
-       th.min = src.min
-       th.max = src.max
-       th.encodeType = src.encodeType
+func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) {
+       tm.dataBlock.copyFrom(&src.dataBlock)
+       tm.min = src.min
+       tm.max = src.max
+       tm.encodeType = src.encodeType
 }
 
-func (th *timestampsMetadata) marshal(dst []byte) []byte {
-       dst = th.dataBlock.marshal(dst)
-       dst = encoding.Uint64ToBytes(dst, uint64(th.min))
-       dst = encoding.Uint64ToBytes(dst, uint64(th.max))
-       dst = append(dst, byte(th.encodeType))
+func (tm *timestampsMetadata) marshal(dst []byte) []byte {
+       dst = tm.dataBlock.marshal(dst)
+       dst = encoding.Uint64ToBytes(dst, uint64(tm.min))
+       dst = encoding.Uint64ToBytes(dst, uint64(tm.max))
+       dst = append(dst, byte(tm.encodeType))
        return dst
 }
 
-func (th *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
-       src, err := th.dataBlock.unmarshal(src)
+func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
+       src, err := tm.dataBlock.unmarshal(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err)
        }
-       th.min = int64(encoding.BytesToUint64(src))
+       tm.min = int64(encoding.BytesToUint64(src))
        src = src[8:]
-       th.max = int64(encoding.BytesToUint64(src))
+       tm.max = int64(encoding.BytesToUint64(src))
        src = src[8:]
-       th.encodeType = encoding.EncodeType(src[0])
+       tm.encodeType = encoding.EncodeType(src[0])
        return src[1:], nil
 }
 
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index c6019539..20b6a5a7 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -239,15 +239,11 @@ func (pw *partWrapper) decRef() {
        if n > 0 {
                return
        }
-       if pw.mp != nil {
-               releaseMemPart(pw.mp)
-               pw.mp = nil
-               pw.p = nil
-               return
-       }
        pw.p.close()
        if pw.removable.Load() && pw.p.fileSystem != nil {
-               pw.p.fileSystem.MustRMAll(pw.p.path)
+               go func(pw *partWrapper) {
+                       pw.p.fileSystem.MustRMAll(pw.p.path)
+               }(pw)
        }
 }
 
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
index c44d114a..aa41f6d1 100644
--- a/banyand/measure/part_iter.go
+++ b/banyand/measure/part_iter.go
@@ -58,10 +58,11 @@ func (pi *partIter) reset() {
        pi.err = nil
 }
 
-func (pi *partIter) init(p *part, sids []common.SeriesID, minTimestamp, 
maxTimestamp int64) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
        pi.reset()
        pi.p = p
 
+       pi.bms = bma.arr
        pi.sids = sids
        pi.minTimestamp = minTimestamp
        pi.maxTimestamp = maxTimestamp
@@ -145,13 +146,13 @@ func (pi *partIter) loadNextBlockMetadata() bool {
                        continue
                }
 
-               bm, err := pi.readPrimaryBlock(pbm)
+               var err error
+               pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm)
                if err != nil {
                        pi.err = fmt.Errorf("cannot read primary block for part 
%q at offset %d with size %d: %w",
                                &pi.p.partMetadata, pbm.offset, pbm.size, err)
                        return false
                }
-               pi.bms = bm
                return true
        }
        pi.err = io.EOF
@@ -177,7 +178,7 @@ func searchPBM(pbmIndex []primaryBlockMetadata, sid 
common.SeriesID) []primaryBl
        return pbmIndex[n-1:]
 }
 
-func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) 
([]blockMetadata, error) {
+func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr 
*primaryBlockMetadata) ([]blockMetadata, error) {
        pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, 
int(mr.size))
        fs.MustReadData(pi.p.primary, int64(mr.offset), pi.compressedPrimaryBuf)
 
@@ -186,12 +187,11 @@ func (pi *partIter) readPrimaryBlock(mr 
*primaryBlockMetadata) ([]blockMetadata,
        if err != nil {
                return nil, fmt.Errorf("cannot decompress index block: %w", err)
        }
-       bm := make([]blockMetadata, 0)
-       bm, err = unmarshalBlockMetadata(bm, pi.primaryBuf)
+       bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
        }
-       return bm, nil
+       return bms, nil
 }
 
 func (pi *partIter) findBlock() bool {
diff --git a/banyand/measure/part_iter_test.go 
b/banyand/measure/part_iter_test.go
index bc2a3301..2451c9b0 100644
--- a/banyand/measure/part_iter_test.go
+++ b/banyand/measure/part_iter_test.go
@@ -110,12 +110,14 @@ func Test_partIter_nextBlock(t *testing.T) {
                },
        }
 
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        verifyPart := func(p *part) {
                                defer p.close()
                                pi := partIter{}
-                               pi.init(p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
+                               pi.init(bma, p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
 
                                var got []blockMetadata
                                for pi.nextBlock() {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 66160ae8..bf1c2d7c 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -110,12 +110,15 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
                }
                result.snapshots = append(result.snapshots, s)
        }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
        // TODO: cache tstIter
        var tstIter tstIter
+       defer tstIter.reset()
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
-       tstIter.init(parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
        if tstIter.Error() != nil {
                return nil, fmt.Errorf("cannot init tstIter: %w", 
tstIter.Error())
        }
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index e88ed586..56111323 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -543,6 +543,8 @@ func TestQueryResult(t *testing.T) {
                },
        }
 
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        verify := func(t *testing.T, tst *tsTable) {
@@ -561,7 +563,7 @@ func TestQueryResult(t *testing.T) {
                                        return sids[i] < tt.sids[j]
                                })
                                ti := &tstIter{}
-                               ti.init(pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
+                               ti.init(bma, pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
 
                                var result queryResult
                                // Query all tags
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 96bcfaef..e65aef6e 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -299,7 +299,7 @@ func (ti *tstIter) reset() {
        ti.nextBlockNoop = false
 }
 
-func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, 
maxTimestamp int64) {
+func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
        ti.reset()
        ti.parts = parts
 
@@ -308,7 +308,7 @@ func (ti *tstIter) init(parts []*part, sids 
[]common.SeriesID, minTimestamp, max
        }
        ti.piPool = ti.piPool[:len(ti.parts)]
        for i, p := range ti.parts {
-               ti.piPool[i].init(p, sids, minTimestamp, maxTimestamp)
+               ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp)
        }
 
        ti.piHeap = ti.piHeap[:0]
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index bbbae3a8..a37d1a9f 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -136,6 +136,8 @@ func Test_tstIter(t *testing.T) {
                minTimestamp int64
                maxTimestamp int64
        }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
 
        verify := func(t *testing.T, tt testCtx, tst *tsTable) uint64 {
                defer tst.Close()
@@ -147,7 +149,7 @@ func Test_tstIter(t *testing.T) {
                pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp)
                require.Equal(t, len(s.parts), n)
                ti := &tstIter{}
-               ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
+               ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
                var got []blockMetadata
                for ti.nextBlock() {
                        if ti.piHeap[0].curBlock.seriesID == 0 {
diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go
index 3366ac1e..08968b7f 100644
--- a/banyand/stream/block_metadata.go
+++ b/banyand/stream/block_metadata.go
@@ -34,34 +34,34 @@ type dataBlock struct {
        size   uint64
 }
 
-func (h *dataBlock) reset() {
-       h.offset = 0
-       h.size = 0
+func (d *dataBlock) reset() {
+       d.offset = 0
+       d.size = 0
 }
 
-func (h *dataBlock) copyFrom(src *dataBlock) {
-       h.offset = src.offset
-       h.size = src.size
+func (d *dataBlock) copyFrom(src *dataBlock) {
+       d.offset = src.offset
+       d.size = src.size
 }
 
-func (h *dataBlock) marshal(dst []byte) []byte {
-       dst = encoding.VarUint64ToBytes(dst, h.offset)
-       dst = encoding.VarUint64ToBytes(dst, h.size)
+func (d *dataBlock) marshal(dst []byte) []byte {
+       dst = encoding.VarUint64ToBytes(dst, d.offset)
+       dst = encoding.VarUint64ToBytes(dst, d.size)
        return dst
 }
 
-func (h *dataBlock) unmarshal(src []byte) ([]byte, error) {
+func (d *dataBlock) unmarshal(src []byte) ([]byte, error) {
        src, n, err := encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal offset: %w", err)
        }
-       h.offset = n
+       d.offset = n
 
        src, n, err = encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal size: %w", err)
        }
-       h.size = n
+       d.size = n
        return src, nil
 }
 
@@ -75,89 +75,89 @@ type blockMetadata struct {
        count                 uint64
 }
 
-func (bh *blockMetadata) copyFrom(src *blockMetadata) {
-       bh.seriesID = src.seriesID
-       bh.uncompressedSizeBytes = src.uncompressedSizeBytes
-       bh.count = src.count
-       bh.timestamps.copyFrom(&src.timestamps)
-       bh.elementIDs.copyFrom(&src.elementIDs)
+func (bm *blockMetadata) copyFrom(src *blockMetadata) {
+       bm.seriesID = src.seriesID
+       bm.uncompressedSizeBytes = src.uncompressedSizeBytes
+       bm.count = src.count
+       bm.timestamps.copyFrom(&src.timestamps)
+       bm.elementIDs.copyFrom(&src.elementIDs)
        for k, db := range src.tagFamilies {
-               if bh.tagFamilies == nil {
-                       bh.tagFamilies = make(map[string]*dataBlock)
+               if bm.tagFamilies == nil {
+                       bm.tagFamilies = make(map[string]*dataBlock)
                }
-               bh.tagFamilies[k] = &dataBlock{}
-               bh.tagFamilies[k].copyFrom(db)
+               bm.tagFamilies[k] = &dataBlock{}
+               bm.tagFamilies[k].copyFrom(db)
        }
 }
 
-func (bh *blockMetadata) getTagFamilyMetadata(name string) *dataBlock {
-       if bh.tagFamilies == nil {
-               bh.tagFamilies = make(map[string]*dataBlock)
+func (bm *blockMetadata) getTagFamilyMetadata(name string) *dataBlock {
+       if bm.tagFamilies == nil {
+               bm.tagFamilies = make(map[string]*dataBlock)
        }
-       tf, ok := bh.tagFamilies[name]
+       tf, ok := bm.tagFamilies[name]
        if !ok {
                tf = &dataBlock{}
-               bh.tagFamilies[name] = tf
+               bm.tagFamilies[name] = tf
        }
        return tf
 }
 
-func (bh *blockMetadata) reset() {
-       bh.seriesID = 0
-       bh.uncompressedSizeBytes = 0
-       bh.count = 0
-       bh.timestamps.reset()
-       bh.elementIDs.reset()
-       for k := range bh.tagFamilies {
-               bh.tagFamilies[k].reset()
-               delete(bh.tagFamilies, k)
+func (bm *blockMetadata) reset() {
+       bm.seriesID = 0
+       bm.uncompressedSizeBytes = 0
+       bm.count = 0
+       bm.timestamps.reset()
+       bm.elementIDs.reset()
+       for k := range bm.tagFamilies {
+               bm.tagFamilies[k].reset()
+               delete(bm.tagFamilies, k)
        }
-       bh.tagProjection = bh.tagProjection[:0]
+       bm.tagProjection = bm.tagProjection[:0]
 }
 
-func (bh *blockMetadata) marshal(dst []byte) []byte {
-       dst = bh.seriesID.AppendToBytes(dst)
-       dst = encoding.VarUint64ToBytes(dst, bh.uncompressedSizeBytes)
-       dst = encoding.VarUint64ToBytes(dst, bh.count)
-       dst = bh.timestamps.marshal(dst)
-       dst = bh.elementIDs.marshal(dst)
-       dst = encoding.VarUint64ToBytes(dst, uint64(len(bh.tagFamilies)))
+func (bm *blockMetadata) marshal(dst []byte) []byte {
+       dst = bm.seriesID.AppendToBytes(dst)
+       dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSizeBytes)
+       dst = encoding.VarUint64ToBytes(dst, bm.count)
+       dst = bm.timestamps.marshal(dst)
+       dst = bm.elementIDs.marshal(dst)
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(bm.tagFamilies)))
        // make sure the order of tagFamilies is stable
-       keys := make([]string, 0, len(bh.tagFamilies))
-       for k := range bh.tagFamilies {
+       keys := make([]string, 0, len(bm.tagFamilies))
+       for k := range bm.tagFamilies {
                keys = append(keys, k)
        }
        sort.Strings(keys)
        for _, name := range keys {
-               cf := bh.tagFamilies[name]
+               cf := bm.tagFamilies[name]
                dst = encoding.EncodeBytes(dst, convert.StringToBytes(name))
                dst = cf.marshal(dst)
        }
        return dst
 }
 
-func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) {
+func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) {
        if len(src) < 8 {
                return nil, errors.New("cannot unmarshal blockMetadata from 
less than 8 bytes")
        }
-       bh.seriesID = common.SeriesID(encoding.BytesToUint64(src))
+       bm.seriesID = common.SeriesID(encoding.BytesToUint64(src))
        src = src[8:]
        src, n, err := encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: 
%w", err)
        }
-       bh.uncompressedSizeBytes = n
+       bm.uncompressedSizeBytes = n
 
        src, n, err = encoding.BytesToVarUint64(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal count: %w", err)
        }
-       bh.count = n
-       src, err = bh.timestamps.unmarshal(src)
+       bm.count = n
+       src, err = bm.timestamps.unmarshal(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: 
%w", err)
        }
-       src, err = bh.elementIDs.unmarshal(src)
+       src, err = bm.elementIDs.unmarshal(src)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal elementIDsMetadata: 
%w", err)
        }
@@ -166,8 +166,8 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                return nil, fmt.Errorf("cannot unmarshal tagFamilies count: 
%w", err)
        }
        if n > 0 {
-               if bh.tagFamilies == nil {
-                       bh.tagFamilies = make(map[string]*dataBlock, n)
+               if bm.tagFamilies == nil {
+                       bm.tagFamilies = make(map[string]*dataBlock, n)
                }
                var nameBytes []byte
                for i := uint64(0); i < n; i++ {
@@ -181,7 +181,7 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
                        if err != nil {
                                return nil, fmt.Errorf("cannot unmarshal 
tagFamily dataBlock: %w", err)
                        }
-                       bh.tagFamilies[convert.BytesToString(nameBytes)] = tf
+                       bm.tagFamilies[convert.BytesToString(nameBytes)] = tf
                }
        }
        if err != nil {
@@ -190,11 +190,11 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
        return src, nil
 }
 
-func (bh blockMetadata) less(other blockMetadata) bool {
-       if bh.seriesID == other.seriesID {
-               return bh.timestamps.min < other.timestamps.min
+func (bm blockMetadata) less(other blockMetadata) bool {
+       if bm.seriesID == other.seriesID {
+               return bm.timestamps.min < other.timestamps.min
        }
-       return bh.seriesID < other.seriesID
+       return bm.seriesID < other.seriesID
 }
 
 func generateBlockMetadata() *blockMetadata {
@@ -205,13 +205,32 @@ func generateBlockMetadata() *blockMetadata {
        return v.(*blockMetadata)
 }
 
-func releaseBlockMetadata(bh *blockMetadata) {
-       bh.reset()
-       blockMetadataPool.Put(bh)
+func releaseBlockMetadata(bm *blockMetadata) {
+       bm.reset()
+       blockMetadataPool.Put(bm)
 }
 
 var blockMetadataPool sync.Pool
 
+type blockMetadataArray struct {
+       arr []blockMetadata
+}
+
+var blockMetadataArrayPool sync.Pool
+
+func generateBlockMetadataArray() *blockMetadataArray {
+       v := blockMetadataArrayPool.Get()
+       if v == nil {
+               return &blockMetadataArray{}
+       }
+       return v.(*blockMetadataArray)
+}
+
+func releaseBlockMetadataArray(bma *blockMetadataArray) {
+       bma.arr = bma.arr[:0]
+       blockMetadataArrayPool.Put(bma)
+}
+
 type timestampsMetadata struct {
        dataBlock
        min        int64
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 95a12297..59af3462 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -410,7 +410,9 @@ func (pw *partWrapper) decRef() {
        }
        pw.p.close()
        if pw.removable.Load() && pw.p.fileSystem != nil {
-               pw.p.fileSystem.MustRMAll(pw.p.path)
+               go func(pw *partWrapper) {
+                       pw.p.fileSystem.MustRMAll(pw.p.path)
+               }(pw)
        }
 }
 
diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go
index 9cd9df3a..1b53439a 100644
--- a/banyand/stream/part_iter.go
+++ b/banyand/stream/part_iter.go
@@ -58,10 +58,11 @@ func (pi *partIter) reset() {
        pi.err = nil
 }
 
-func (pi *partIter) init(p *part, sids []common.SeriesID, minTimestamp, 
maxTimestamp int64) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
        pi.reset()
        pi.p = p
 
+       pi.bms = bma.arr
        pi.sids = sids
        pi.minTimestamp = minTimestamp
        pi.maxTimestamp = maxTimestamp
@@ -145,13 +146,13 @@ func (pi *partIter) loadNextBlockMetadata() bool {
                        continue
                }
 
-               bm, err := pi.readPrimaryBlock(pbm)
+               var err error
+               pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm)
                if err != nil {
                        pi.err = fmt.Errorf("cannot read primary block for part 
%q at offset %d with size %d: %w",
                                &pi.p.partMetadata, pbm.offset, pbm.size, err)
                        return false
                }
-               pi.bms = bm
                return true
        }
        pi.err = io.EOF
@@ -177,7 +178,7 @@ func searchPBM(pbmIndex []primaryBlockMetadata, sid 
common.SeriesID) []primaryBl
        return pbmIndex[n-1:]
 }
 
-func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) 
([]blockMetadata, error) {
+func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr 
*primaryBlockMetadata) ([]blockMetadata, error) {
        pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, 
int(mr.size))
        fs.MustReadData(pi.p.primary, int64(mr.offset), pi.compressedPrimaryBuf)
 
@@ -186,12 +187,11 @@ func (pi *partIter) readPrimaryBlock(mr 
*primaryBlockMetadata) ([]blockMetadata,
        if err != nil {
                return nil, fmt.Errorf("cannot decompress index block: %w", err)
        }
-       bm := make([]blockMetadata, 0)
-       bm, err = unmarshalBlockMetadata(bm, pi.primaryBuf)
+       bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf)
        if err != nil {
                return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
        }
-       return bm, nil
+       return bms, nil
 }
 
 func (pi *partIter) findBlock() bool {
diff --git a/banyand/stream/part_iter_test.go b/banyand/stream/part_iter_test.go
index b0cc2174..8a600cc4 100644
--- a/banyand/stream/part_iter_test.go
+++ b/banyand/stream/part_iter_test.go
@@ -109,13 +109,14 @@ func Test_partIter_nextBlock(t *testing.T) {
                        wantErr: nil,
                },
        }
-
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        verifyPart := func(p *part) {
                                defer p.close()
                                pi := partIter{}
-                               pi.init(p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
+                               pi.init(bma, p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
 
                                var got []blockMetadata
                                for pi.nextBlock() {
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index f8ae2167..92aca49b 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -546,12 +546,15 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                }
                result.snapshots = append(result.snapshots, s)
        }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
        // TODO: cache tstIter
        var tstIter tstIter
+       defer tstIter.reset()
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
-       tstIter.init(parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
        if tstIter.Error() != nil {
                return nil, fmt.Errorf("cannot init tstIter: %w", 
tstIter.Error())
        }
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index 4d4d2bc5..fa3e1c61 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -326,7 +326,8 @@ func TestQueryResult(t *testing.T) {
                        }},
                },
        }
-
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        verify := func(t *testing.T, tst *tsTable) {
@@ -345,7 +346,7 @@ func TestQueryResult(t *testing.T) {
                                        return sids[i] < tt.sids[j]
                                })
                                ti := &tstIter{}
-                               ti.init(pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
+                               ti.init(bma, pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
 
                                var result queryResult
                                for ti.nextBlock() {
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 35610207..272c956c 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -327,7 +327,7 @@ func (ti *tstIter) reset() {
        ti.nextBlockNoop = false
 }
 
-func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, 
maxTimestamp int64) {
+func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
        ti.reset()
        ti.parts = parts
 
@@ -336,7 +336,7 @@ func (ti *tstIter) init(parts []*part, sids 
[]common.SeriesID, minTimestamp, max
        }
        ti.piPool = ti.piPool[:len(ti.parts)]
        for i, p := range ti.parts {
-               ti.piPool[i].init(p, sids, minTimestamp, maxTimestamp)
+               ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp)
        }
 
        ti.piHeap = ti.piHeap[:0]
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index cec9d389..de3bef3b 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -135,6 +135,8 @@ func Test_tstIter(t *testing.T) {
                minTimestamp int64
                maxTimestamp int64
        }
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
 
        verify := func(t *testing.T, tt testCtx, tst *tsTable) uint64 {
                defer tst.Close()
@@ -146,7 +148,7 @@ func Test_tstIter(t *testing.T) {
                pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp)
                require.Equal(t, len(s.parts), n)
                ti := &tstIter{}
-               ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
+               ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
                var got []blockMetadata
                for ti.nextBlock() {
                        if ti.piHeap[0].curBlock.seriesID == 0 {

Reply via email to