This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e11b3ef847597def13103d76ab61aec64ea4dfbe
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Aug 24 08:52:11 2025 +0800

    Refactor block and part handling: Remove mustInitFromElements method from 
block, update MustWriteElements to accept data directly, and enhance part to 
read and decode metadata and tags. Improve memory management and streamline 
element initialization in tests.
---
 banyand/internal/sidx/block.go        |  52 +--
 banyand/internal/sidx/block_test.go   |  69 ---
 banyand/internal/sidx/block_writer.go |  21 +-
 banyand/internal/sidx/metadata.go     |  56 ++-
 banyand/internal/sidx/part.go         | 207 ++++++++-
 banyand/internal/sidx/part_test.go    | 832 +++++++++++++++-------------------
 6 files changed, 619 insertions(+), 618 deletions(-)

diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index a0ece632..cc638542 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -95,38 +95,6 @@ func (b *block) reset() {
        b.pooled = false
 }
 
-// mustInitFromElements initializes block from sorted elements.
-func (b *block) mustInitFromElements(elems *elements) {
-       b.reset()
-       if elems.Len() == 0 {
-               return
-       }
-
-       // Verify elements are sorted
-       elems.assertSorted()
-
-       // Copy core data
-       b.userKeys = append(b.userKeys, elems.userKeys...)
-       b.data = append(b.data, elems.data...)
-
-       // Process tags
-       b.mustInitFromTags(elems.tags)
-}
-
-// assertSorted verifies that elements are sorted correctly.
-func (e *elements) assertSorted() {
-       for i := 1; i < e.Len(); i++ {
-               if e.seriesIDs[i] < e.seriesIDs[i-1] {
-                       panic(fmt.Sprintf("elements not sorted by seriesID: 
index %d (%d) < index %d (%d)",
-                               i, e.seriesIDs[i], i-1, e.seriesIDs[i-1]))
-               }
-               if e.seriesIDs[i] == e.seriesIDs[i-1] && e.userKeys[i] < 
e.userKeys[i-1] {
-                       panic(fmt.Sprintf("elements not sorted by userKey: 
index %d (%d) < index %d (%d) for seriesID %d",
-                               i, e.userKeys[i], i-1, e.userKeys[i-1], 
e.seriesIDs[i]))
-               }
-       }
-}
-
 // mustInitFromTags processes tag data for the block.
 func (b *block) mustInitFromTags(elementTags [][]tag) {
        if len(elementTags) == 0 {
@@ -280,8 +248,8 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm 
*blockMetadata, ww *writers)
        bm.uncompressedSize = b.uncompressedSizeBytes()
        bm.count = uint64(b.Len())
 
-       // Write user keys to keys.bin
-       mustWriteKeysTo(&bm.keysBlock, b.userKeys, &ww.keysWriter)
+       // Write user keys to keys.bin and capture encoding information
+       bm.keysEncodeType, bm.minKey = mustWriteKeysTo(&bm.keysBlock, 
b.userKeys, &ww.keysWriter)
 
        // Write data payloads to data.bin
        mustWriteDataTo(&bm.dataBlock, b.data, &ww.dataWriter)
@@ -346,13 +314,13 @@ func (b *block) mustWriteTag(tagName string, td *tagData, 
bm *blockMetadata, ww
        tmw.MustWrite(bb.Buf)
 
        // Update block metadata
-       tagMeta := bm.getTagMetadata(tagName)
-       tagMeta.offset = tmw.bytesWritten - uint64(len(bb.Buf))
-       tagMeta.size = uint64(len(bb.Buf))
+       offset := tmw.bytesWritten - uint64(len(bb.Buf))
+       size := uint64(len(bb.Buf))
+       bm.setTagMetadata(tagName, offset, size)
 }
 
-// mustWriteKeysTo writes user keys to the keys writer.
-func mustWriteKeysTo(kb *dataBlock, userKeys []int64, keysWriter *writer) {
+// mustWriteKeysTo writes user keys to the keys writer and returns encoding 
metadata.
+func mustWriteKeysTo(kb *dataBlock, userKeys []int64, keysWriter *writer) 
(encoding.EncodeType, int64) {
        bb := bigValuePool.Get()
        if bb == nil {
                bb = &bytes.Buffer{}
@@ -363,13 +331,17 @@ func mustWriteKeysTo(kb *dataBlock, userKeys []int64, 
keysWriter *writer) {
        }()
 
        // Encode user keys
-       bb.Buf, _, _ = encoding.Int64ListToBytes(bb.Buf[:0], userKeys)
+       var encodeType encoding.EncodeType
+       var firstValue int64
+       bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], 
userKeys)
 
        // Compress and write
        compressedData := zstd.Compress(nil, bb.Buf, 1)
        kb.offset = keysWriter.bytesWritten
        kb.size = uint64(len(compressedData))
        keysWriter.MustWrite(compressedData)
+
+       return encodeType, firstValue
 }
 
 // mustWriteDataTo writes data payloads to the data writer.
diff --git a/banyand/internal/sidx/block_test.go 
b/banyand/internal/sidx/block_test.go
index 3b972a62..b3ed92c9 100644
--- a/banyand/internal/sidx/block_test.go
+++ b/banyand/internal/sidx/block_test.go
@@ -19,9 +19,6 @@ package sidx
 
 import (
        "testing"
-
-       "github.com/apache/skywalking-banyandb/api/common"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 func TestBlock_BasicOperations(t *testing.T) {
@@ -43,72 +40,6 @@ func TestBlock_BasicOperations(t *testing.T) {
        }
 }
 
-func TestBlock_InitFromElements(t *testing.T) {
-       // Create test elements
-       elems := generateElements()
-       defer releaseElements(elems)
-
-       // Add test data
-       elems.seriesIDs = append(elems.seriesIDs, common.SeriesID(1), 
common.SeriesID(1))
-       elems.userKeys = append(elems.userKeys, 100, 200)
-       elems.data = append(elems.data, []byte("data1"), []byte("data2"))
-
-       // Add test tags
-       tag1 := tag{
-               name:      "service",
-               value:     []byte("web-service"),
-               valueType: pbv1.ValueTypeStr,
-               indexed:   true,
-       }
-       tag2 := tag{
-               name:      "endpoint",
-               value:     []byte("/api/users"),
-               valueType: pbv1.ValueTypeStr,
-               indexed:   false,
-       }
-
-       elems.tags = append(elems.tags,
-               []tag{tag1, tag2},
-               []tag{tag1, tag2},
-       )
-
-       // Create block and initialize from elements
-       b := generateBlock()
-       defer releaseBlock(b)
-
-       b.mustInitFromElements(elems)
-
-       // Verify block state
-       if b.isEmpty() {
-               t.Error("Block should not be empty after initialization")
-       }
-
-       if b.Len() != 2 {
-               t.Errorf("Expected block length 2, got %d", b.Len())
-       }
-
-       if len(b.userKeys) != 2 {
-               t.Errorf("Expected 2 user keys, got %d", len(b.userKeys))
-       }
-
-       if b.userKeys[0] != 100 || b.userKeys[1] != 200 {
-               t.Errorf("User keys not properly set: got %v", b.userKeys)
-       }
-
-       // Verify tags were processed
-       if len(b.tags) != 2 {
-               t.Errorf("Expected 2 tags, got %d", len(b.tags))
-       }
-
-       if _, exists := b.tags["service"]; !exists {
-               t.Error("Expected 'service' tag to exist")
-       }
-
-       if _, exists := b.tags["endpoint"]; !exists {
-               t.Error("Expected 'endpoint' tag to exist")
-       }
-}
-
 func TestBlock_Validation(t *testing.T) {
        b := generateBlock()
        defer releaseBlock(b)
diff --git a/banyand/internal/sidx/block_writer.go 
b/banyand/internal/sidx/block_writer.go
index 76a99528..8067e009 100644
--- a/banyand/internal/sidx/block_writer.go
+++ b/banyand/internal/sidx/block_writer.go
@@ -239,29 +239,20 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem 
fs.FileSystem, path string
 }
 
 // MustWriteElements writes elements to the block writer.
-func (bw *blockWriter) MustWriteElements(sid common.SeriesID, userKeys 
[]int64, tags [][]tag) {
+func (bw *blockWriter) MustWriteElements(sid common.SeriesID, userKeys 
[]int64, data [][]byte, tags [][]tag) {
        if len(userKeys) == 0 {
                return
        }
 
        b := generateBlock()
        defer releaseBlock(b)
+       // Copy core data
+       b.userKeys = append(b.userKeys, userKeys...)
+       b.data = append(b.data, data...)
 
-       // Convert to elements format for initialization
-       es := generateElements()
-       defer releaseElements(es)
+       // Process tags
+       b.mustInitFromTags(tags)
 
-       es.seriesIDs = append(es.seriesIDs, sid)
-       es.userKeys = append(es.userKeys, userKeys...)
-       es.tags = append(es.tags, tags...)
-
-       // Create data slice with proper length
-       es.data = make([][]byte, len(userKeys))
-       for i := range es.data {
-               es.data[i] = nil // Placeholder data
-       }
-
-       b.mustInitFromElements(es)
        bw.mustWriteBlock(sid, b)
 }
 
diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
index 008ec728..85b7dcab 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -25,6 +25,7 @@ import (
        "io"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
@@ -55,6 +56,7 @@ type blockMetadata struct {
        maxKey           int64
        uncompressedSize uint64
        count            uint64
+       keysEncodeType   encoding.EncodeType
 }
 
 type blockMetadataArray struct {
@@ -150,6 +152,7 @@ func (bm *blockMetadata) reset() {
        bm.keysBlock = dataBlock{}
        bm.uncompressedSize = 0
        bm.count = 0
+       bm.keysEncodeType = 0
 
        // Clear maps but keep them allocated
        for k := range bm.tagsBlocks {
@@ -184,8 +187,8 @@ func (bm *blockMetadata) validate() error {
        if bm.dataBlock.size == 0 {
                return fmt.Errorf("invalid data block: size cannot be zero")
        }
-       if bm.keysBlock.size == 0 {
-               return fmt.Errorf("invalid keys block: size cannot be zero")
+       if bm.keysBlock.size == 0 && bm.keysEncodeType != 
encoding.EncodeTypeConst {
+               return fmt.Errorf("invalid keys block: size cannot be zero 
unless using const encoding")
        }
        return nil
 }
@@ -297,6 +300,22 @@ func (bm *blockMetadata) marshal() ([]byte, error) {
                return nil, fmt.Errorf("failed to write keys block size: %w", 
err)
        }
 
+       // Write keys encoding information
+       if err := binary.Write(buf, binary.LittleEndian, 
byte(bm.keysEncodeType)); err != nil {
+               return nil, fmt.Errorf("failed to write keys encode type: %w", 
err)
+       }
+       if err := binary.Write(buf, binary.LittleEndian, bm.minKey); err != nil 
{
+               return nil, fmt.Errorf("failed to write keys first value: %w", 
err)
+       }
+
+       // Write count and uncompressed size
+       if err := binary.Write(buf, binary.LittleEndian, bm.count); err != nil {
+               return nil, fmt.Errorf("failed to write count: %w", err)
+       }
+       if err := binary.Write(buf, binary.LittleEndian, bm.uncompressedSize); 
err != nil {
+               return nil, fmt.Errorf("failed to write uncompressed size: %w", 
err)
+       }
+
        // Write tag blocks count
        if err := binary.Write(buf, binary.LittleEndian, 
uint32(len(bm.tagsBlocks))); err != nil {
                return nil, fmt.Errorf("failed to write tag blocks count: %w", 
err)
@@ -366,6 +385,28 @@ func unmarshalBlockMetadata(data []byte) (*blockMetadata, 
error) {
                return nil, fmt.Errorf("failed to read keys block size: %w", 
err)
        }
 
+       // Read keys encoding information
+       var encodeTypeByte byte
+       if err := binary.Read(buf, binary.LittleEndian, &encodeTypeByte); err 
!= nil {
+               releaseBlockMetadata(bm)
+               return nil, fmt.Errorf("failed to read keys encode type: %w", 
err)
+       }
+       bm.keysEncodeType = encoding.EncodeType(encodeTypeByte)
+       if err := binary.Read(buf, binary.LittleEndian, &bm.minKey); err != nil 
{
+               releaseBlockMetadata(bm)
+               return nil, fmt.Errorf("failed to read keys first value: %w", 
err)
+       }
+
+       // Read count and uncompressed size
+       if err := binary.Read(buf, binary.LittleEndian, &bm.count); err != nil {
+               releaseBlockMetadata(bm)
+               return nil, fmt.Errorf("failed to read count: %w", err)
+       }
+       if err := binary.Read(buf, binary.LittleEndian, &bm.uncompressedSize); 
err != nil {
+               releaseBlockMetadata(bm)
+               return nil, fmt.Errorf("failed to read uncompressed size: %w", 
err)
+       }
+
        // Read tag blocks count
        var tagBlocksCount uint32
        if err := binary.Read(buf, binary.LittleEndian, &tagBlocksCount); err 
!= nil {
@@ -467,12 +508,7 @@ func (bm *blockMetadata) addTagBlock(tagName string, 
offset, size uint64) {
        bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size}
 }
 
-// getTagMetadata gets or creates a tag metadata reference in the block 
metadata.
-func (bm *blockMetadata) getTagMetadata(tagName string) *dataBlock {
-       if _, exists := bm.tagsBlocks[tagName]; !exists {
-               bm.tagsBlocks[tagName] = dataBlock{}
-       }
-       // Return pointer to the dataBlock for the tag
-       block := bm.tagsBlocks[tagName]
-       return &block
+// setTagMetadata sets the tag metadata reference in the block metadata.
+func (bm *blockMetadata) setTagMetadata(tagName string, offset, size uint64) {
+       bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size}
 }
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index c08af669..f51cb719 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -24,8 +24,11 @@ import (
        "sort"
        "strings"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/pool"
@@ -58,6 +61,7 @@ type part struct {
        primary              fs.Reader
        data                 fs.Reader
        keys                 fs.Reader
+       meta                 fs.Reader
        fileSystem           fs.FileSystem
        tagData              map[string]fs.Reader
        tagMetadata          map[string]fs.Reader
@@ -80,6 +84,7 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) 
*part {
        p.primary = mustOpenReader(filepath.Join(path, primaryFilename), 
fileSystem)
        p.data = mustOpenReader(filepath.Join(path, dataFilename), fileSystem)
        p.keys = mustOpenReader(filepath.Join(path, keysFilename), fileSystem)
+       p.meta = mustOpenReader(filepath.Join(path, metaFilename), fileSystem)
 
        // Load part metadata from meta.bin.
        if err := p.loadPartMetadata(); err != nil {
@@ -131,10 +136,10 @@ func (p *part) loadPartMetadata() error {
        return nil
 }
 
-// loadBlockMetadata reads and parses primary block metadata from primary.bin.
+// loadBlockMetadata reads and parses primary block metadata from meta.bin.
 func (p *part) loadPrimaryBlockMetadata() error {
-       // Load primary block metadata from primary.bin file
-       p.primaryBlockMetadata = 
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.primary)
+       // Load primary block metadata from meta.bin file (compressed 
primaryBlockMetadata)
+       p.primaryBlockMetadata = 
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.meta)
        return nil
 }
 
@@ -225,6 +230,9 @@ func (p *part) close() {
        if p.keys != nil {
                fs.MustClose(p.keys)
        }
+       if p.meta != nil {
+               fs.MustClose(p.meta)
+       }
 
        // Close tag files.
        for _, reader := range p.tagData {
@@ -255,6 +263,188 @@ func mustOpenReader(filePath string, fileSystem 
fs.FileSystem) fs.Reader {
        return file
 }
 
+// readAll reads all blocks from the part and returns them as separate 
elements.
+// Each elements collection represents the data from a single block.
+func (p *part) readAll() ([]*elements, error) {
+       if len(p.primaryBlockMetadata) == 0 {
+               return nil, nil
+       }
+
+       result := make([]*elements, 0, len(p.primaryBlockMetadata))
+       compressedPrimaryBuf := make([]byte, 0, 1024)
+       primaryBuf := make([]byte, 0, 1024)
+       compressedDataBuf := make([]byte, 0, 1024)
+       dataBuf := make([]byte, 0, 1024)
+       compressedKeysBuf := make([]byte, 0, 1024)
+       keysBuf := make([]byte, 0, 1024)
+
+       bytesDecoder := &encoding.BytesBlockDecoder{}
+
+       for _, pbm := range p.primaryBlockMetadata {
+               // Read and decompress primary block metadata
+               compressedPrimaryBuf = bytes.ResizeOver(compressedPrimaryBuf, 
int(pbm.size))
+               fs.MustReadData(p.primary, int64(pbm.offset), 
compressedPrimaryBuf)
+
+               var err error
+               primaryBuf, err = zstd.Decompress(primaryBuf[:0], 
compressedPrimaryBuf)
+               if err != nil {
+                       // Clean up any elements created so far
+                       for _, e := range result {
+                               releaseElements(e)
+                       }
+                       return nil, fmt.Errorf("cannot decompress primary 
block: %w", err)
+               }
+
+               // Unmarshal block metadata
+               bm, err := unmarshalBlockMetadata(primaryBuf)
+               if err != nil {
+                       // Clean up any elements created so far
+                       for _, e := range result {
+                               releaseElements(e)
+                       }
+                       return nil, fmt.Errorf("cannot unmarshal block 
metadata: %w", err)
+               }
+
+               // Create elements for this block
+               elems := generateElements()
+
+               // Read user keys
+               compressedKeysBuf = bytes.ResizeOver(compressedKeysBuf, 
int(bm.keysBlock.size))
+               fs.MustReadData(p.keys, int64(bm.keysBlock.offset), 
compressedKeysBuf)
+
+               keysBuf, err = zstd.Decompress(keysBuf[:0], compressedKeysBuf)
+               if err != nil {
+                       releaseElements(elems)
+                       for _, e := range result {
+                               releaseElements(e)
+                       }
+                       return nil, fmt.Errorf("cannot decompress keys block: 
%w", err)
+               }
+
+               // Decode user keys using the stored encoding information
+               elems.userKeys, err = 
encoding.BytesToInt64List(elems.userKeys[:0], keysBuf, bm.keysEncodeType, 
bm.minKey, int(bm.count))
+               if err != nil {
+                       releaseElements(elems)
+                       for _, e := range result {
+                               releaseElements(e)
+                       }
+                       return nil, fmt.Errorf("cannot decode user keys: %w", 
err)
+               }
+
+               // Read data payloads
+               compressedDataBuf = bytes.ResizeOver(compressedDataBuf, 
int(bm.dataBlock.size))
+               fs.MustReadData(p.data, int64(bm.dataBlock.offset), 
compressedDataBuf)
+
+               dataBuf, err = zstd.Decompress(dataBuf[:0], compressedDataBuf)
+               if err != nil {
+                       releaseElements(elems)
+                       for _, e := range result {
+                               releaseElements(e)
+                       }
+                       return nil, fmt.Errorf("cannot decompress data block: 
%w", err)
+               }
+
+               // Decode data payloads
+               bytesDecoder.Reset()
+               elems.data, err = bytesDecoder.Decode(elems.data[:0], dataBuf, 
bm.count)
+               if err != nil {
+                       releaseElements(elems)
+                       for _, e := range result {
+                               releaseElements(e)
+                       }
+                       return nil, fmt.Errorf("cannot decode data payloads: 
%w", err)
+               }
+
+               // Initialize seriesIDs and tags slices
+               elems.seriesIDs = make([]common.SeriesID, int(bm.count))
+               elems.tags = make([][]tag, int(bm.count))
+
+               // Fill seriesIDs - all elements in this block have the same 
seriesID
+               for i := range elems.seriesIDs {
+                       elems.seriesIDs[i] = bm.seriesID
+               }
+
+               // Read tags for each tag name
+               for tagName := range bm.tagsBlocks {
+                       err = p.readBlockTags(tagName, bm, elems)
+                       if err != nil {
+                               releaseElements(elems)
+                               for _, e := range result {
+                                       releaseElements(e)
+                               }
+                               return nil, fmt.Errorf("cannot read tags for 
%s: %w", tagName, err)
+                       }
+               }
+
+               result = append(result, elems)
+       }
+
+       return result, nil
+}
+
+// readBlockTags reads and decodes tag data for a specific tag in a block.
+func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems 
*elements) error {
+       tagBlockInfo, exists := bm.tagsBlocks[tagName]
+       if !exists {
+               return fmt.Errorf("tag block info not found for tag: %s", 
tagName)
+       }
+
+       // Get tag metadata reader
+       tmReader, tmExists := p.getTagMetadataReader(tagName)
+       if !tmExists {
+               return fmt.Errorf("tag metadata reader not found for tag: %s", 
tagName)
+       }
+
+       // Get tag data reader
+       tdReader, tdExists := p.getTagDataReader(tagName)
+       if !tdExists {
+               return fmt.Errorf("tag data reader not found for tag: %s", 
tagName)
+       }
+
+       // Read tag metadata
+       tmData := make([]byte, tagBlockInfo.size)
+       fs.MustReadData(tmReader, int64(tagBlockInfo.offset), tmData)
+
+       tm, err := unmarshalTagMetadata(tmData)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal tag metadata: %w", err)
+       }
+       defer releaseTagMetadata(tm)
+
+       // Read and decompress tag data
+       tdData := make([]byte, tm.dataBlock.size)
+       fs.MustReadData(tdReader, int64(tm.dataBlock.offset), tdData)
+
+       decompressedData, err := zstd.Decompress(nil, tdData)
+       if err != nil {
+               return fmt.Errorf("cannot decompress tag data: %w", err)
+       }
+
+       // Decode tag values
+       tagValues, err := DecodeTagValues(decompressedData, tm.valueType, 
int(bm.count))
+       if err != nil {
+               return fmt.Errorf("cannot decode tag values: %w", err)
+       }
+
+       // Add tag values to elements
+       for i, value := range tagValues {
+               if i >= len(elems.tags) {
+                       break
+               }
+               if elems.tags[i] == nil {
+                       elems.tags[i] = make([]tag, 0, 1)
+               }
+               elems.tags[i] = append(elems.tags[i], tag{
+                       name:      tagName,
+                       value:     value,
+                       valueType: tm.valueType,
+                       indexed:   tm.indexed,
+               })
+       }
+
+       return nil
+}
+
 // String returns a string representation of the part.
 func (p *part) String() string {
        if p.partMetadata != nil {
@@ -426,10 +616,11 @@ func (mp *memPart) mustInitFromElements(es *elements) {
                if i == len(es.seriesIDs) || es.seriesIDs[i] != currentSeriesID 
{
                        // Extract elements for current series
                        seriesUserKeys := es.userKeys[blockStart:i]
+                       seriesData := es.data[blockStart:i]
                        seriesTags := es.tags[blockStart:i]
 
                        // Write elements for this series
-                       bw.MustWriteElements(currentSeriesID, seriesUserKeys, 
seriesTags)
+                       bw.MustWriteElements(currentSeriesID, seriesUserKeys, 
seriesData, seriesTags)
 
                        if i < len(es.seriesIDs) {
                                currentSeriesID = es.seriesIDs[i]
@@ -453,10 +644,10 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
partPath string) {
        fileSystem.MkdirPanicIfExist(partPath, storage.DirPerm)
 
        // Write core files
-       fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(partPath, 
metaFilename), storage.FilePerm)
        fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(partPath, 
primaryFilename), storage.FilePerm)
        fs.MustFlush(fileSystem, mp.data.Buf, filepath.Join(partPath, 
dataFilename), storage.FilePerm)
        fs.MustFlush(fileSystem, mp.keys.Buf, filepath.Join(partPath, 
keysFilename), storage.FilePerm)
+       fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(partPath, 
metaFilename), storage.FilePerm)
 
        // Write individual tag files
        for tagName, td := range mp.tagData {
@@ -507,12 +698,16 @@ func openMemPart(mp *memPart) *part {
                *p.partMetadata = *mp.partMetadata
        }
 
-       // Block metadata is now handled via blockMetadataArray parameter
+       // Load primary block metadata from meta buffer
+       if len(mp.meta.Buf) > 0 {
+               p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(nil, 
&mp.meta)
+       }
 
        // Open data files as readers from memory buffers
        p.primary = &mp.primary
        p.data = &mp.data
        p.keys = &mp.keys
+       p.meta = &mp.meta
 
        // Open individual tag files if they exist
        if mp.tagData != nil {
diff --git a/banyand/internal/sidx/part_test.go 
b/banyand/internal/sidx/part_test.go
index c430191e..71f62ef6 100644
--- a/banyand/internal/sidx/part_test.go
+++ b/banyand/internal/sidx/part_test.go
@@ -20,13 +20,16 @@ package sidx
 import (
        "fmt"
        "path/filepath"
+       "sort"
        "testing"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 func TestExtractTagNameAndExtension(t *testing.T) {
@@ -96,386 +99,6 @@ func TestExtractTagNameAndExtension(t *testing.T) {
        }
 }
 
-func TestPartLifecycleManagement(t *testing.T) {
-       // Create test filesystem
-       testFS := fs.NewLocalFileSystem()
-       tempDir := t.TempDir()
-
-       // Create test part metadata
-       pm := generatePartMetadata()
-       pm.ID = 12345
-       pm.MinKey = 100
-       pm.MaxKey = 999
-       pm.BlocksCount = 2
-       pm.CompressedSizeBytes = 1024
-       pm.UncompressedSizeBytes = 2048
-       pm.TotalCount = 50
-
-       // Marshal metadata to create test files
-       metaData, err := pm.marshal()
-       require.NoError(t, err)
-
-       // Create valid primary block metadata
-       pbm := primaryBlockMetadata{
-               seriesID: 1,
-               minKey:   100,
-               maxKey:   999,
-               dataBlock: dataBlock{
-                       offset: 0,
-                       size:   1024,
-               },
-       }
-
-       // Marshal and compress primary block metadata
-       primaryData := pbm.marshal(nil)
-       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
-
-       // Create required files
-       testFiles := map[string][]byte{
-               primaryFilename: compressedPrimaryData,
-               dataFilename:    []byte("data content"),
-               keysFilename:    []byte("keys content"),
-               metaFilename:    metaData,
-       }
-
-       for fileName, content := range testFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(t, err)
-       }
-
-       // Test part opening
-       part := mustOpenPart(tempDir, testFS)
-       require.NotNil(t, part)
-       defer part.close()
-
-       // Verify part properties
-       assert.Equal(t, tempDir, part.path)
-       assert.Equal(t, testFS, part.fileSystem)
-       assert.NotNil(t, part.primary)
-       assert.NotNil(t, part.data)
-       assert.NotNil(t, part.keys)
-
-       // Verify metadata was loaded
-       require.NotNil(t, part.partMetadata)
-       assert.Equal(t, uint64(12345), part.partMetadata.ID)
-       assert.Equal(t, int64(100), part.partMetadata.MinKey)
-       assert.Equal(t, int64(999), part.partMetadata.MaxKey)
-       assert.Equal(t, uint64(2), part.partMetadata.BlocksCount)
-
-       // Verify String method
-       expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir)
-       assert.Equal(t, expectedString, part.String())
-
-       // Test accessors
-       assert.Equal(t, part.partMetadata, part.getPartMetadata())
-       assert.Equal(t, tempDir, part.Path())
-
-       // Cleanup
-       releasePartMetadata(pm)
-}
-
-func TestPartWithTagFiles(t *testing.T) {
-       // Create test filesystem
-       testFS := fs.NewLocalFileSystem()
-       tempDir := t.TempDir()
-
-       // Create minimal required files first
-       pm := generatePartMetadata()
-       pm.ID = 67890
-       pm.BlocksCount = 1
-       metaData, err := pm.marshal()
-       require.NoError(t, err)
-
-       // Create valid primary block metadata
-       pbm := primaryBlockMetadata{
-               seriesID: 1,
-               minKey:   0,
-               maxKey:   100,
-               dataBlock: dataBlock{
-                       offset: 0,
-                       size:   512,
-               },
-       }
-
-       // Marshal and compress primary block metadata
-       primaryData := pbm.marshal(nil)
-       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
-
-       requiredFiles := map[string][]byte{
-               primaryFilename: compressedPrimaryData,
-               dataFilename:    []byte("data"),
-               keysFilename:    []byte("keys"),
-               metaFilename:    metaData,
-       }
-
-       for fileName, content := range requiredFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(t, err)
-       }
-
-       // Create tag files
-       tagFiles := map[string][]byte{
-               "user_id.td":      []byte("user id tag data"),
-               "user_id.tm":      []byte("user id tag metadata"),
-               "user_id.tf":      []byte("user id tag filter"),
-               "service_name.td": []byte("service name tag data"),
-               "service_name.tf": []byte("service name tag filter"),
-               "endpoint.tm":     []byte("endpoint tag metadata"),
-               // Invalid files that should be ignored
-               "invalid":          []byte("invalid tag file"),
-               "invalid.unknown":  []byte("unknown extension"),
-               "regular_file.txt": []byte("regular file"),
-       }
-
-       for fileName, content := range tagFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(t, err)
-       }
-
-       // Open part
-       part := mustOpenPart(tempDir, testFS)
-       require.NotNil(t, part)
-       defer part.close()
-
-       // Verify tag files were opened correctly
-       assert.Equal(t, 2, len(part.tagData))     // user_id, service_name
-       assert.Equal(t, 2, len(part.tagMetadata)) // user_id, endpoint
-       assert.Equal(t, 2, len(part.tagFilters))  // user_id, service_name
-
-       // Test tag data readers
-       userIDData, exists := part.getTagDataReader("user_id")
-       assert.True(t, exists)
-       assert.NotNil(t, userIDData)
-
-       serviceNameData, exists := part.getTagDataReader("service_name")
-       assert.True(t, exists)
-       assert.NotNil(t, serviceNameData)
-
-       _, exists = part.getTagDataReader("endpoint")
-       assert.False(t, exists) // Only has metadata, not data
-
-       // Test tag metadata readers
-       userIDMeta, exists := part.getTagMetadataReader("user_id")
-       assert.True(t, exists)
-       assert.NotNil(t, userIDMeta)
-
-       endpointMeta, exists := part.getTagMetadataReader("endpoint")
-       assert.True(t, exists)
-       assert.NotNil(t, endpointMeta)
-
-       _, exists = part.getTagMetadataReader("service_name")
-       assert.False(t, exists) // Only has data and filter, not metadata
-
-       // Test tag filter readers
-       userIDFilter, exists := part.getTagFilterReader("user_id")
-       assert.True(t, exists)
-       assert.NotNil(t, userIDFilter)
-
-       serviceNameFilter, exists := part.getTagFilterReader("service_name")
-       assert.True(t, exists)
-       assert.NotNil(t, serviceNameFilter)
-
-       _, exists = part.getTagFilterReader("endpoint")
-       assert.False(t, exists) // Only has metadata, not filter
-
-       // Test available tag names
-       tagNames := part.getAvailableTagNames()
-       assert.Equal(t, 3, len(tagNames))
-       expectedTags := map[string]bool{
-               "user_id":      false,
-               "service_name": false,
-               "endpoint":     false,
-       }
-       for _, tagName := range tagNames {
-               _, exists := expectedTags[tagName]
-               assert.True(t, exists, "unexpected tag name: %s", tagName)
-               expectedTags[tagName] = true
-       }
-       for tagName, found := range expectedTags {
-               assert.True(t, found, "missing expected tag name: %s", tagName)
-       }
-
-       // Test hasTagFiles
-       assert.True(t, part.hasTagFiles("user_id"))      // Has all three types
-       assert.True(t, part.hasTagFiles("service_name")) // Has data and filter
-       assert.True(t, part.hasTagFiles("endpoint"))     // Has metadata only
-       assert.False(t, part.hasTagFiles("nonexistent")) // Doesn't exist
-
-       // Cleanup
-       releasePartMetadata(pm)
-}
-
-func TestPartErrorHandling(t *testing.T) {
-       testFS := fs.NewLocalFileSystem()
-       tempDir := t.TempDir()
-
-       // Test 1: Missing required files should panic
-       t.Run("missing_files_panic", func(t *testing.T) {
-               defer func() {
-                       if r := recover(); r == nil {
-                               t.Error("expected panic when opening part with 
missing files")
-                       }
-               }()
-               mustOpenPart(tempDir, testFS)
-       })
-
-       // Test 2: Invalid metadata should panic
-       t.Run("invalid_metadata_panic", func(t *testing.T) {
-               defer func() {
-                       if r := recover(); r == nil {
-                               t.Error("expected panic when loading invalid 
metadata")
-                       }
-               }()
-
-               // Create required files with invalid metadata
-               testFiles := map[string][]byte{
-                       primaryFilename: []byte("primary"),
-                       dataFilename:    []byte("data"),
-                       keysFilename:    []byte("keys"),
-                       metaFilename:    []byte("invalid json metadata"),
-               }
-
-               for fileName, content := range testFiles {
-                       filePath := filepath.Join(tempDir, fileName)
-                       _, err := testFS.Write(content, filePath, 0o644)
-                       require.NoError(t, err)
-               }
-
-               mustOpenPart(tempDir, testFS)
-       })
-}
-
-func TestPartClosingBehavior(t *testing.T) {
-       testFS := fs.NewLocalFileSystem()
-       tempDir := t.TempDir()
-
-       // Create test part
-       pm := generatePartMetadata()
-       pm.ID = 11111
-       pm.BlocksCount = 1
-       metaData, err := pm.marshal()
-       require.NoError(t, err)
-
-       // Create valid primary block metadata
-       pbm := primaryBlockMetadata{
-               seriesID: 1,
-               minKey:   0,
-               maxKey:   100,
-               dataBlock: dataBlock{
-                       offset: 0,
-                       size:   256,
-               },
-       }
-
-       // Marshal and compress primary block metadata
-       primaryData := pbm.marshal(nil)
-       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
-
-       testFiles := map[string][]byte{
-               primaryFilename: compressedPrimaryData,
-               dataFilename:    []byte("data"),
-               keysFilename:    []byte("keys"),
-               metaFilename:    metaData,
-               "test.td":       []byte("tag data"),
-               "test.tm":       []byte("tag metadata"),
-               "test.tf":       []byte("tag filter"),
-       }
-
-       for fileName, content := range testFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(t, err)
-       }
-
-       // Open part
-       part := mustOpenPart(tempDir, testFS)
-       require.NotNil(t, part)
-
-       // Verify it's properly opened
-       assert.NotNil(t, part.primary)
-       assert.NotNil(t, part.data)
-       assert.NotNil(t, part.keys)
-       assert.Equal(t, 1, len(part.tagData))
-       assert.Equal(t, 1, len(part.tagMetadata))
-       assert.Equal(t, 1, len(part.tagFilters))
-       assert.NotNil(t, part.partMetadata)
-
-       // Close the part
-       part.close()
-
-       // Verify resources are cleaned up
-       // Note: We can't directly test that files are closed since fs.Reader
-       // doesn't expose that state, but we can verify that metadata is 
released
-       assert.Nil(t, part.partMetadata)
-
-       // Test closing with defensive programming (nil check in close method)
-       // The close method should handle nil pointers gracefully
-
-       // Cleanup
-       releasePartMetadata(pm)
-}
-
-func TestPartMemoryManagement(t *testing.T) {
-       testFS := fs.NewLocalFileSystem()
-       tempDir := t.TempDir()
-
-       // Create test part with block metadata
-       pm := generatePartMetadata()
-       pm.ID = 22222
-       pm.BlocksCount = 3 // Test with multiple blocks
-       metaData, err := pm.marshal()
-       require.NoError(t, err)
-
-       // Create valid primary block metadata
-       pbm := primaryBlockMetadata{
-               seriesID: 1,
-               minKey:   0,
-               maxKey:   100,
-               dataBlock: dataBlock{
-                       offset: 0,
-                       size:   256,
-               },
-       }
-
-       // Marshal and compress primary block metadata
-       primaryData := pbm.marshal(nil)
-       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
-
-       testFiles := map[string][]byte{
-               primaryFilename: compressedPrimaryData,
-               dataFilename:    []byte("data"),
-               keysFilename:    []byte("keys"),
-               metaFilename:    metaData,
-       }
-
-       for fileName, content := range testFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(t, err)
-       }
-
-       // Open and immediately close multiple parts to test memory management
-       for i := 0; i < 10; i++ {
-               part := mustOpenPart(tempDir, testFS)
-               require.NotNil(t, part)
-
-               // Verify part was created correctly
-               assert.NotNil(t, part.partMetadata)
-
-               // Close immediately
-               part.close()
-
-               // Verify cleanup
-               assert.Nil(t, part.partMetadata)
-       }
-
-       // Cleanup
-       releasePartMetadata(pm)
-}
-
 func TestPartStringRepresentation(t *testing.T) {
        testFS := fs.NewLocalFileSystem()
        tempDir := t.TempDir()
@@ -529,126 +152,379 @@ func TestPartStringRepresentation(t *testing.T) {
        releasePartMetadata(pm)
 }
 
-// Benchmark tests for performance validation.
-func BenchmarkPartOpen(b *testing.B) {
-       testFS := fs.NewLocalFileSystem()
-       tempDir := b.TempDir()
-
-       // Setup test data
-       pm := generatePartMetadata()
-       pm.ID = 77777
-       pm.BlocksCount = 5
-       metaData, err := pm.marshal()
-       require.NoError(b, err)
-
-       // Create valid primary block metadata
-       pbm := primaryBlockMetadata{
-               seriesID: 1,
-               minKey:   0,
-               maxKey:   1000,
-               dataBlock: dataBlock{
-                       offset: 0,
-                       size:   1024,
+func TestMemPartInitialization(t *testing.T) {
+       // Create simple test elements
+       elems := createTestElements([]testElement{
+               {
+                       seriesID: 1,
+                       userKey:  100,
+                       data:     []byte("data1"),
+                       tags: []tag{
+                               {
+                                       name:      "service",
+                                       value:     []byte("order-service"),
+                                       valueType: pbv1.ValueTypeStr,
+                                       indexed:   true,
+                               },
+                       },
                },
-       }
+       })
+       defer releaseElements(elems)
 
-       // Marshal and compress primary block metadata
-       primaryData := pbm.marshal(nil)
-       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
+       t.Run("memPart initialization", func(t *testing.T) {
+               mp := generateMemPart()
+               defer releaseMemPart(mp)
 
-       testFiles := map[string][]byte{
-               primaryFilename: compressedPrimaryData,
-               dataFilename:    []byte("data content for benchmark"),
-               keysFilename:    []byte("keys content for benchmark"),
-               metaFilename:    metaData,
-       }
+               // Check if memPart is properly initialized from elements
+               mp.mustInitFromElements(elems)
 
-       // Add multiple tag files
-       for i := 0; i < 10; i++ {
-               tagName := fmt.Sprintf("tag_%d", i)
-               testFiles[fmt.Sprintf("%s.td", tagName)] = 
[]byte(fmt.Sprintf("tag data %d", i))
-               testFiles[fmt.Sprintf("%s.tm", tagName)] = 
[]byte(fmt.Sprintf("tag metadata %d", i))
-               testFiles[fmt.Sprintf("%s.tf", tagName)] = 
[]byte(fmt.Sprintf("tag filter %d", i))
-       }
+               t.Logf("Primary buffer length: %d", len(mp.primary.Buf))
+               t.Logf("Data buffer length: %d", len(mp.data.Buf))
+               t.Logf("Keys buffer length: %d", len(mp.keys.Buf))
 
-       for fileName, content := range testFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(b, err)
-       }
+               // Check partMetadata
+               if mp.partMetadata != nil {
+                       t.Logf("Part metadata blocks count: %d", 
mp.partMetadata.BlocksCount)
+                       t.Logf("Part metadata total count: %d", 
mp.partMetadata.TotalCount)
+               } else {
+                       t.Log("Part metadata is nil")
+               }
 
-       b.ResetTimer()
-       b.ReportAllocs()
+               // Check tag metadata
+               for tagName, buf := range mp.tagMetadata {
+                       t.Logf("Tag metadata for %s: %d bytes", tagName, 
len(buf.Buf))
+               }
 
-       for i := 0; i < b.N; i++ {
-               part := mustOpenPart(tempDir, testFS)
-               part.close()
-       }
+               // Check tag data
+               for tagName, buf := range mp.tagData {
+                       t.Logf("Tag data for %s: %d bytes", tagName, 
len(buf.Buf))
+               }
 
-       // Cleanup
-       releasePartMetadata(pm)
+               // Verify primary buffer has some data
+               if len(mp.primary.Buf) > 0 {
+                       t.Log("✓ Primary buffer contains data")
+               } else {
+                       t.Log("⚠ Primary buffer is empty")
+               }
+       })
 }
 
-func BenchmarkPartTagAccess(b *testing.B) {
+func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) {
+       // Test case that flushes to disk first, then reads back
+       // This works around issues with in-memory primary metadata
        testFS := fs.NewLocalFileSystem()
-       tempDir := b.TempDir()
-
-       // Setup test part with many tag files
-       pm := generatePartMetadata()
-       pm.ID = 88888
-       pm.BlocksCount = 1
-       metaData, err := pm.marshal()
-       require.NoError(b, err)
+       tempDir := t.TempDir()
 
-       // Create valid primary block metadata
-       pbm := primaryBlockMetadata{
-               seriesID: 1,
-               minKey:   0,
-               maxKey:   100,
-               dataBlock: dataBlock{
-                       offset: 0,
-                       size:   256,
+       tests := []struct {
+               name     string
+               elements *elements
+       }{
+               {
+                       name: "single series with single element",
+                       elements: createTestElements([]testElement{
+                               {
+                                       seriesID: 1,
+                                       userKey:  100,
+                                       data:     []byte("data1"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                       },
+                               },
+                       }),
+               },
+               {
+                       name: "single series with multiple elements",
+                       elements: createTestElements([]testElement{
+                               {
+                                       seriesID: 1,
+                                       userKey:  100,
+                                       data:     []byte("data1"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                               {
+                                                       name:      "method",
+                                                       value:     
[]byte("POST"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   false,
+                                               },
+                                       },
+                               },
+                               {
+                                       seriesID: 1,
+                                       userKey:  101,
+                                       data:     []byte("data2"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                               {
+                                                       name:      "method",
+                                                       value:     
[]byte("GET"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   false,
+                                               },
+                                       },
+                               },
+                       }),
+               },
+               {
+                       name: "multiple series with multiple elements",
+                       elements: createTestElements([]testElement{
+                               {
+                                       seriesID: 1,
+                                       userKey:  100,
+                                       data:     []byte("data1"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                               {
+                                                       name:      "duration",
+                                                       value:     []byte{0, 0, 
0, 0, 0, 0, 0, 100}, // int64: 100
+                                                       valueType: 
pbv1.ValueTypeInt64,
+                                                       indexed:   false,
+                                               },
+                                       },
+                               },
+                               {
+                                       seriesID: 1,
+                                       userKey:  101,
+                                       data:     []byte("data2"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                               {
+                                                       name:      "duration",
+                                                       value:     []byte{0, 0, 
0, 0, 0, 0, 0, 150}, // int64: 150
+                                                       valueType: 
pbv1.ValueTypeInt64,
+                                                       indexed:   false,
+                                               },
+                                       },
+                               },
+                               {
+                                       seriesID: 2,
+                                       userKey:  200,
+                                       data:     []byte("data3"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("payment-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                               {
+                                                       name:      "duration",
+                                                       value:     []byte{0, 0, 
0, 0, 0, 0, 0, 75}, // int64: 75
+                                                       valueType: 
pbv1.ValueTypeInt64,
+                                                       indexed:   false,
+                                               },
+                                       },
+                               },
+                       }),
+               },
+               {
+                       name: "elements with missing tags",
+                       elements: createTestElements([]testElement{
+                               {
+                                       seriesID: 1,
+                                       userKey:  100,
+                                       data:     []byte("data1"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                       },
+                               },
+                               {
+                                       seriesID: 1,
+                                       userKey:  101,
+                                       data:     []byte("data2"),
+                                       tags: []tag{
+                                               {
+                                                       name:      "service",
+                                                       value:     
[]byte("order-service"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   true,
+                                               },
+                                               {
+                                                       name:      "endpoint",
+                                                       value:     
[]byte("/api/orders"),
+                                                       valueType: 
pbv1.ValueTypeStr,
+                                                       indexed:   false,
+                                               },
+                                       },
+                               },
+                       }),
                },
        }
 
-       // Marshal and compress primary block metadata
-       primaryData := pbm.marshal(nil)
-       compressedPrimaryData := zstd.Compress(nil, primaryData, 1)
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Step 1: Create memPart and initialize with elements
+                       mp := generateMemPart()
+                       defer releaseMemPart(mp)
+
+                       // Create a copy of elements for initialization to 
avoid modifying the original
+                       elementsCopy := generateElements()
+                       defer releaseElements(elementsCopy)
+                       
+                       // Deep copy the elements
+                       elementsCopy.seriesIDs = append(elementsCopy.seriesIDs, 
tt.elements.seriesIDs...)
+                       elementsCopy.userKeys = append(elementsCopy.userKeys, 
tt.elements.userKeys...)
+                       elementsCopy.data = append(elementsCopy.data, 
tt.elements.data...)
+                       elementsCopy.tags = append(elementsCopy.tags, 
tt.elements.tags...)
+                       
+                       // Initialize memPart from elements copy
+                       mp.mustInitFromElements(elementsCopy)
+
+                       // Step 2: Flush memPart to disk
+                       partDir := filepath.Join(tempDir, 
fmt.Sprintf("part_%s", tt.name))
+                       mp.mustFlush(testFS, partDir)
+
+                       // Step 3: Open the flushed part from disk
+                       part := mustOpenPart(partDir, testFS)
+                       defer part.close()
+
+
+                       // Step 4: Read all elements back from part
+                       resultElements, err := part.readAll()
+                       require.NoError(t, err, "readAll should not return 
error")
+
+                       // Step 5: Verify the result
+                       require.NotEmpty(t, resultElements, "should return at 
least one elements collection")
+
+                       // Combine all returned elements into a single 
collection for comparison
+                       combined := generateElements()
+                       defer releaseElements(combined)
+
+                       for _, elems := range resultElements {
+                               combined.seriesIDs = append(combined.seriesIDs, 
elems.seriesIDs...)
+                               combined.userKeys = append(combined.userKeys, 
elems.userKeys...)
+                               combined.data = append(combined.data, 
elems.data...)
+                               combined.tags = append(combined.tags, 
elems.tags...)
+                       }
 
-       testFiles := map[string][]byte{
-               primaryFilename: compressedPrimaryData,
-               dataFilename:    []byte("data"),
-               keysFilename:    []byte("keys"),
-               metaFilename:    metaData,
+                       // Create a clean copy of original elements for 
comparison (avoid sorting corruption)
+                       originalCopy := generateElements()
+                       defer releaseElements(originalCopy)
+                       
+                       originalCopy.seriesIDs = append(originalCopy.seriesIDs, 
tt.elements.seriesIDs...)
+                       originalCopy.userKeys = append(originalCopy.userKeys, 
tt.elements.userKeys...)
+                       originalCopy.data = append(originalCopy.data, 
tt.elements.data...)
+                       originalCopy.tags = append(originalCopy.tags, 
tt.elements.tags...)
+
+                       // Sort both original copy and result for comparison
+                       sort.Sort(originalCopy)
+                       sort.Sort(combined)
+
+                       // Verify elements match
+                       compareElements(t, originalCopy, combined)
+
+                       // Clean up individual result elements
+                       for _, elems := range resultElements {
+                               releaseElements(elems)
+                       }
+               })
        }
+}
 
-       // Add many tag files
-       numTags := 100
-       for i := 0; i < numTags; i++ {
-               tagName := fmt.Sprintf("tag_%d", i)
-               testFiles[fmt.Sprintf("%s.td", tagName)] = 
[]byte(fmt.Sprintf("data %d", i))
-               testFiles[fmt.Sprintf("%s.tm", tagName)] = 
[]byte(fmt.Sprintf("meta %d", i))
-               testFiles[fmt.Sprintf("%s.tf", tagName)] = 
[]byte(fmt.Sprintf("filter %d", i))
-       }
+// testElement represents a single element for test creation
+type testElement struct {
+       seriesID common.SeriesID
+       userKey  int64
+       data     []byte
+       tags     []tag
+}
 
-       for fileName, content := range testFiles {
-               filePath := filepath.Join(tempDir, fileName)
-               _, err := testFS.Write(content, filePath, 0o644)
-               require.NoError(b, err)
+// createTestElements creates an elements collection from test data
+func createTestElements(testElems []testElement) *elements {
+       elems := generateElements()
+
+       for _, te := range testElems {
+               elems.seriesIDs = append(elems.seriesIDs, te.seriesID)
+               elems.userKeys = append(elems.userKeys, te.userKey)
+
+               // Copy data
+               dataCopy := make([]byte, len(te.data))
+               copy(dataCopy, te.data)
+               elems.data = append(elems.data, dataCopy)
+
+               // Copy tags
+               tagsCopy := make([]tag, len(te.tags))
+               for i, t := range te.tags {
+                       tagsCopy[i] = tag{
+                               name:      t.name,
+                               value:     append([]byte(nil), t.value...),
+                               valueType: t.valueType,
+                               indexed:   t.indexed,
+                       }
+               }
+               elems.tags = append(elems.tags, tagsCopy)
        }
 
-       part := mustOpenPart(tempDir, testFS)
-       defer part.close()
+       return elems
+}
 
-       b.ResetTimer()
-       b.ReportAllocs()
+// compareElements compares two elements collections for equality
+func compareElements(t *testing.T, expected, actual *elements) {
+       require.Equal(t, len(expected.seriesIDs), len(actual.seriesIDs), 
"seriesIDs length mismatch")
+       require.Equal(t, len(expected.userKeys), len(actual.userKeys), 
"userKeys length mismatch")
+       require.Equal(t, len(expected.data), len(actual.data), "data length 
mismatch")
+       require.Equal(t, len(expected.tags), len(actual.tags), "tags length 
mismatch")
+
+       for i := 0; i < len(expected.seriesIDs); i++ {
+               assert.Equal(t, expected.seriesIDs[i], actual.seriesIDs[i], 
"seriesID mismatch at index %d", i)
+               assert.Equal(t, expected.userKeys[i], actual.userKeys[i], 
"userKey mismatch at index %d", i)
+               assert.Equal(t, expected.data[i], actual.data[i], "data 
mismatch at index %d", i)
+
+               // Compare tags
+               require.Equal(t, len(expected.tags[i]), len(actual.tags[i]), 
"tags length mismatch at element %d", i)
+
+               // Sort tags by name for consistent comparison
+               expectedTags := make([]tag, len(expected.tags[i]))
+               actualTags := make([]tag, len(actual.tags[i]))
+               copy(expectedTags, expected.tags[i])
+               copy(actualTags, actual.tags[i])
+
+               sort.Slice(expectedTags, func(a, b int) bool {
+                       return expectedTags[a].name < expectedTags[b].name
+               })
+               sort.Slice(actualTags, func(a, b int) bool {
+                       return actualTags[a].name < actualTags[b].name
+               })
 
-       for i := 0; i < b.N; i++ {
-               tagName := fmt.Sprintf("tag_%d", i%numTags)
-               _, exists := part.getTagDataReader(tagName)
-               assert.True(b, exists)
+               for j := 0; j < len(expectedTags); j++ {
+                       assert.Equal(t, expectedTags[j].name, 
actualTags[j].name,
+                               "tag name mismatch at element %d, tag %d", i, j)
+                       assert.Equal(t, expectedTags[j].value, 
actualTags[j].value,
+                               "tag value mismatch at element %d, tag %d 
(%s)", i, j, expectedTags[j].name)
+                       assert.Equal(t, expectedTags[j].valueType, 
actualTags[j].valueType,
+                               "tag valueType mismatch at element %d, tag %d 
(%s)", i, j, expectedTags[j].name)
+                       assert.Equal(t, expectedTags[j].indexed, 
actualTags[j].indexed,
+                               "tag indexed mismatch at element %d, tag %d 
(%s)", i, j, expectedTags[j].name)
+               }
        }
-
-       // Cleanup
-       releasePartMetadata(pm)
 }


Reply via email to