This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e11b3ef847597def13103d76ab61aec64ea4dfbe Author: Gao Hongtao <[email protected]> AuthorDate: Sun Aug 24 08:52:11 2025 +0800 Refactor block and part handling: Remove mustInitFromElements method from block, update MustWriteElements to accept data directly, and enhance part to read and decode metadata and tags. Improve memory management and streamline element initialization in tests. --- banyand/internal/sidx/block.go | 52 +-- banyand/internal/sidx/block_test.go | 69 --- banyand/internal/sidx/block_writer.go | 21 +- banyand/internal/sidx/metadata.go | 56 ++- banyand/internal/sidx/part.go | 207 ++++++++- banyand/internal/sidx/part_test.go | 832 +++++++++++++++------------------- 6 files changed, 619 insertions(+), 618 deletions(-) diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go index a0ece632..cc638542 100644 --- a/banyand/internal/sidx/block.go +++ b/banyand/internal/sidx/block.go @@ -95,38 +95,6 @@ func (b *block) reset() { b.pooled = false } -// mustInitFromElements initializes block from sorted elements. -func (b *block) mustInitFromElements(elems *elements) { - b.reset() - if elems.Len() == 0 { - return - } - - // Verify elements are sorted - elems.assertSorted() - - // Copy core data - b.userKeys = append(b.userKeys, elems.userKeys...) - b.data = append(b.data, elems.data...) - - // Process tags - b.mustInitFromTags(elems.tags) -} - -// assertSorted verifies that elements are sorted correctly. -func (e *elements) assertSorted() { - for i := 1; i < e.Len(); i++ { - if e.seriesIDs[i] < e.seriesIDs[i-1] { - panic(fmt.Sprintf("elements not sorted by seriesID: index %d (%d) < index %d (%d)", - i, e.seriesIDs[i], i-1, e.seriesIDs[i-1])) - } - if e.seriesIDs[i] == e.seriesIDs[i-1] && e.userKeys[i] < e.userKeys[i-1] { - panic(fmt.Sprintf("elements not sorted by userKey: index %d (%d) < index %d (%d) for seriesID %d", - i, e.userKeys[i], i-1, e.userKeys[i-1], e.seriesIDs[i])) - } - } -} - // mustInitFromTags processes tag data for the block. func (b *block) mustInitFromTags(elementTags [][]tag) { if len(elementTags) == 0 { @@ -280,8 +248,8 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers) bm.uncompressedSize = b.uncompressedSizeBytes() bm.count = uint64(b.Len()) - // Write user keys to keys.bin - mustWriteKeysTo(&bm.keysBlock, b.userKeys, &ww.keysWriter) + // Write user keys to keys.bin and capture encoding information + bm.keysEncodeType, bm.minKey = mustWriteKeysTo(&bm.keysBlock, b.userKeys, &ww.keysWriter) // Write data payloads to data.bin mustWriteDataTo(&bm.dataBlock, b.data, &ww.dataWriter) @@ -346,13 +314,13 @@ func (b *block) mustWriteTag(tagName string, td *tagData, bm *blockMetadata, ww tmw.MustWrite(bb.Buf) // Update block metadata - tagMeta := bm.getTagMetadata(tagName) - tagMeta.offset = tmw.bytesWritten - uint64(len(bb.Buf)) - tagMeta.size = uint64(len(bb.Buf)) + offset := tmw.bytesWritten - uint64(len(bb.Buf)) + size := uint64(len(bb.Buf)) + bm.setTagMetadata(tagName, offset, size) } -// mustWriteKeysTo writes user keys to the keys writer. -func mustWriteKeysTo(kb *dataBlock, userKeys []int64, keysWriter *writer) { +// mustWriteKeysTo writes user keys to the keys writer and returns encoding metadata. +func mustWriteKeysTo(kb *dataBlock, userKeys []int64, keysWriter *writer) (encoding.EncodeType, int64) { bb := bigValuePool.Get() if bb == nil { bb = &bytes.Buffer{} @@ -363,13 +331,17 @@ func mustWriteKeysTo(kb *dataBlock, userKeys []int64, keysWriter *writer) { }() // Encode user keys - bb.Buf, _, _ = encoding.Int64ListToBytes(bb.Buf[:0], userKeys) + var encodeType encoding.EncodeType + var firstValue int64 + bb.Buf, encodeType, firstValue = encoding.Int64ListToBytes(bb.Buf[:0], userKeys) // Compress and write compressedData := zstd.Compress(nil, bb.Buf, 1) kb.offset = keysWriter.bytesWritten kb.size = uint64(len(compressedData)) keysWriter.MustWrite(compressedData) + + return encodeType, firstValue } // mustWriteDataTo writes data payloads to the data writer. diff --git a/banyand/internal/sidx/block_test.go b/banyand/internal/sidx/block_test.go index 3b972a62..b3ed92c9 100644 --- a/banyand/internal/sidx/block_test.go +++ b/banyand/internal/sidx/block_test.go @@ -19,9 +19,6 @@ package sidx import ( "testing" - - "github.com/apache/skywalking-banyandb/api/common" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) func TestBlock_BasicOperations(t *testing.T) { @@ -43,72 +40,6 @@ func TestBlock_BasicOperations(t *testing.T) { } } -func TestBlock_InitFromElements(t *testing.T) { - // Create test elements - elems := generateElements() - defer releaseElements(elems) - - // Add test data - elems.seriesIDs = append(elems.seriesIDs, common.SeriesID(1), common.SeriesID(1)) - elems.userKeys = append(elems.userKeys, 100, 200) - elems.data = append(elems.data, []byte("data1"), []byte("data2")) - - // Add test tags - tag1 := tag{ - name: "service", - value: []byte("web-service"), - valueType: pbv1.ValueTypeStr, - indexed: true, - } - tag2 := tag{ - name: "endpoint", - value: []byte("/api/users"), - valueType: pbv1.ValueTypeStr, - indexed: false, - } - - elems.tags = append(elems.tags, - []tag{tag1, tag2}, - []tag{tag1, tag2}, - ) - - // Create block and initialize from elements - b := generateBlock() - defer releaseBlock(b) - - b.mustInitFromElements(elems) - - // Verify block state - if b.isEmpty() { - t.Error("Block should not be empty after initialization") - } - - if b.Len() != 2 { - t.Errorf("Expected block length 2, got %d", b.Len()) - } - - if len(b.userKeys) != 2 { - t.Errorf("Expected 2 user keys, got %d", len(b.userKeys)) - } - - if b.userKeys[0] != 100 || b.userKeys[1] != 200 { - t.Errorf("User keys not properly set: got %v", b.userKeys) - } - - // Verify tags were processed - if len(b.tags) != 2 { - t.Errorf("Expected 2 tags, got %d", len(b.tags)) - } - - if _, exists := b.tags["service"]; !exists { - t.Error("Expected 'service' tag to exist") - } - - if _, exists := b.tags["endpoint"]; !exists { - t.Error("Expected 'endpoint' tag to exist") - } -} - func TestBlock_Validation(t *testing.T) { b := generateBlock() defer releaseBlock(b) diff --git a/banyand/internal/sidx/block_writer.go b/banyand/internal/sidx/block_writer.go index 76a99528..8067e009 100644 --- a/banyand/internal/sidx/block_writer.go +++ b/banyand/internal/sidx/block_writer.go @@ -239,29 +239,20 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string } // MustWriteElements writes elements to the block writer. -func (bw *blockWriter) MustWriteElements(sid common.SeriesID, userKeys []int64, tags [][]tag) { +func (bw *blockWriter) MustWriteElements(sid common.SeriesID, userKeys []int64, data [][]byte, tags [][]tag) { if len(userKeys) == 0 { return } b := generateBlock() defer releaseBlock(b) + // Copy core data + b.userKeys = append(b.userKeys, userKeys...) + b.data = append(b.data, data...) - // Convert to elements format for initialization - es := generateElements() - defer releaseElements(es) + // Process tags + b.mustInitFromTags(tags) - es.seriesIDs = append(es.seriesIDs, sid) - es.userKeys = append(es.userKeys, userKeys...) - es.tags = append(es.tags, tags...) - - // Create data slice with proper length - es.data = make([][]byte, len(userKeys)) - for i := range es.data { - es.data[i] = nil // Placeholder data - } - - b.mustInitFromElements(es) bw.mustWriteBlock(sid, b) } diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index 008ec728..85b7dcab 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -25,6 +25,7 @@ import ( "io" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/pool" ) @@ -55,6 +56,7 @@ type blockMetadata struct { maxKey int64 uncompressedSize uint64 count uint64 + keysEncodeType encoding.EncodeType } type blockMetadataArray struct { @@ -150,6 +152,7 @@ func (bm *blockMetadata) reset() { bm.keysBlock = dataBlock{} bm.uncompressedSize = 0 bm.count = 0 + bm.keysEncodeType = 0 // Clear maps but keep them allocated for k := range bm.tagsBlocks { @@ -184,8 +187,8 @@ func (bm *blockMetadata) validate() error { if bm.dataBlock.size == 0 { return fmt.Errorf("invalid data block: size cannot be zero") } - if bm.keysBlock.size == 0 { - return fmt.Errorf("invalid keys block: size cannot be zero") + if bm.keysBlock.size == 0 && bm.keysEncodeType != encoding.EncodeTypeConst { + return fmt.Errorf("invalid keys block: size cannot be zero unless using const encoding") } return nil } @@ -297,6 +300,22 @@ func (bm *blockMetadata) marshal() ([]byte, error) { return nil, fmt.Errorf("failed to write keys block size: %w", err) } + // Write keys encoding information + if err := binary.Write(buf, binary.LittleEndian, byte(bm.keysEncodeType)); err != nil { + return nil, fmt.Errorf("failed to write keys encode type: %w", err) + } + if err := binary.Write(buf, binary.LittleEndian, bm.minKey); err != nil { + return nil, fmt.Errorf("failed to write keys first value: %w", err) + } + + // Write count and uncompressed size + if err := binary.Write(buf, binary.LittleEndian, bm.count); err != nil { + return nil, fmt.Errorf("failed to write count: %w", err) + } + if err := binary.Write(buf, binary.LittleEndian, bm.uncompressedSize); err != nil { + return nil, fmt.Errorf("failed to write uncompressed size: %w", err) + } + // Write tag blocks count if err := binary.Write(buf, binary.LittleEndian, uint32(len(bm.tagsBlocks))); err != nil { return nil, fmt.Errorf("failed to write tag blocks count: %w", err) @@ -366,6 +385,28 @@ func unmarshalBlockMetadata(data []byte) (*blockMetadata, error) { return nil, fmt.Errorf("failed to read keys block size: %w", err) } + // Read keys encoding information + var encodeTypeByte byte + if err := binary.Read(buf, binary.LittleEndian, &encodeTypeByte); err != nil { + releaseBlockMetadata(bm) + return nil, fmt.Errorf("failed to read keys encode type: %w", err) + } + bm.keysEncodeType = encoding.EncodeType(encodeTypeByte) + if err := binary.Read(buf, binary.LittleEndian, &bm.minKey); err != nil { + releaseBlockMetadata(bm) + return nil, fmt.Errorf("failed to read keys first value: %w", err) + } + + // Read count and uncompressed size + if err := binary.Read(buf, binary.LittleEndian, &bm.count); err != nil { + releaseBlockMetadata(bm) + return nil, fmt.Errorf("failed to read count: %w", err) + } + if err := binary.Read(buf, binary.LittleEndian, &bm.uncompressedSize); err != nil { + releaseBlockMetadata(bm) + return nil, fmt.Errorf("failed to read uncompressed size: %w", err) + } + // Read tag blocks count var tagBlocksCount uint32 if err := binary.Read(buf, binary.LittleEndian, &tagBlocksCount); err != nil { @@ -467,12 +508,7 @@ func (bm *blockMetadata) addTagBlock(tagName string, offset, size uint64) { bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size} } -// getTagMetadata gets or creates a tag metadata reference in the block metadata. -func (bm *blockMetadata) getTagMetadata(tagName string) *dataBlock { - if _, exists := bm.tagsBlocks[tagName]; !exists { - bm.tagsBlocks[tagName] = dataBlock{} - } - // Return pointer to the dataBlock for the tag - block := bm.tagsBlocks[tagName] - return &block +// setTagMetadata sets the tag metadata reference in the block metadata. +func (bm *blockMetadata) setTagMetadata(tagName string, offset, size uint64) { + bm.tagsBlocks[tagName] = dataBlock{offset: offset, size: size} } diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index c08af669..f51cb719 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -24,8 +24,11 @@ import ( "sort" "strings" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/pool" @@ -58,6 +61,7 @@ type part struct { primary fs.Reader data fs.Reader keys fs.Reader + meta fs.Reader fileSystem fs.FileSystem tagData map[string]fs.Reader tagMetadata map[string]fs.Reader @@ -80,6 +84,7 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) *part { p.primary = mustOpenReader(filepath.Join(path, primaryFilename), fileSystem) p.data = mustOpenReader(filepath.Join(path, dataFilename), fileSystem) p.keys = mustOpenReader(filepath.Join(path, keysFilename), fileSystem) + p.meta = mustOpenReader(filepath.Join(path, metaFilename), fileSystem) // Load part metadata from meta.bin. if err := p.loadPartMetadata(); err != nil { @@ -131,10 +136,10 @@ func (p *part) loadPartMetadata() error { return nil } -// loadBlockMetadata reads and parses primary block metadata from primary.bin. +// loadBlockMetadata reads and parses primary block metadata from meta.bin. func (p *part) loadPrimaryBlockMetadata() error { - // Load primary block metadata from primary.bin file - p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.primary) + // Load primary block metadata from meta.bin file (compressed primaryBlockMetadata) + p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], p.meta) return nil } @@ -225,6 +230,9 @@ func (p *part) close() { if p.keys != nil { fs.MustClose(p.keys) } + if p.meta != nil { + fs.MustClose(p.meta) + } // Close tag files. for _, reader := range p.tagData { @@ -255,6 +263,188 @@ func mustOpenReader(filePath string, fileSystem fs.FileSystem) fs.Reader { return file } +// readAll reads all blocks from the part and returns them as separate elements. +// Each elements collection represents the data from a single block. +func (p *part) readAll() ([]*elements, error) { + if len(p.primaryBlockMetadata) == 0 { + return nil, nil + } + + result := make([]*elements, 0, len(p.primaryBlockMetadata)) + compressedPrimaryBuf := make([]byte, 0, 1024) + primaryBuf := make([]byte, 0, 1024) + compressedDataBuf := make([]byte, 0, 1024) + dataBuf := make([]byte, 0, 1024) + compressedKeysBuf := make([]byte, 0, 1024) + keysBuf := make([]byte, 0, 1024) + + bytesDecoder := &encoding.BytesBlockDecoder{} + + for _, pbm := range p.primaryBlockMetadata { + // Read and decompress primary block metadata + compressedPrimaryBuf = bytes.ResizeOver(compressedPrimaryBuf, int(pbm.size)) + fs.MustReadData(p.primary, int64(pbm.offset), compressedPrimaryBuf) + + var err error + primaryBuf, err = zstd.Decompress(primaryBuf[:0], compressedPrimaryBuf) + if err != nil { + // Clean up any elements created so far + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decompress primary block: %w", err) + } + + // Unmarshal block metadata + bm, err := unmarshalBlockMetadata(primaryBuf) + if err != nil { + // Clean up any elements created so far + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot unmarshal block metadata: %w", err) + } + + // Create elements for this block + elems := generateElements() + + // Read user keys + compressedKeysBuf = bytes.ResizeOver(compressedKeysBuf, int(bm.keysBlock.size)) + fs.MustReadData(p.keys, int64(bm.keysBlock.offset), compressedKeysBuf) + + keysBuf, err = zstd.Decompress(keysBuf[:0], compressedKeysBuf) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decompress keys block: %w", err) + } + + // Decode user keys using the stored encoding information + elems.userKeys, err = encoding.BytesToInt64List(elems.userKeys[:0], keysBuf, bm.keysEncodeType, bm.minKey, int(bm.count)) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decode user keys: %w", err) + } + + // Read data payloads + compressedDataBuf = bytes.ResizeOver(compressedDataBuf, int(bm.dataBlock.size)) + fs.MustReadData(p.data, int64(bm.dataBlock.offset), compressedDataBuf) + + dataBuf, err = zstd.Decompress(dataBuf[:0], compressedDataBuf) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decompress data block: %w", err) + } + + // Decode data payloads + bytesDecoder.Reset() + elems.data, err = bytesDecoder.Decode(elems.data[:0], dataBuf, bm.count) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot decode data payloads: %w", err) + } + + // Initialize seriesIDs and tags slices + elems.seriesIDs = make([]common.SeriesID, int(bm.count)) + elems.tags = make([][]tag, int(bm.count)) + + // Fill seriesIDs - all elements in this block have the same seriesID + for i := range elems.seriesIDs { + elems.seriesIDs[i] = bm.seriesID + } + + // Read tags for each tag name + for tagName := range bm.tagsBlocks { + err = p.readBlockTags(tagName, bm, elems) + if err != nil { + releaseElements(elems) + for _, e := range result { + releaseElements(e) + } + return nil, fmt.Errorf("cannot read tags for %s: %w", tagName, err) + } + } + + result = append(result, elems) + } + + return result, nil +} + +// readBlockTags reads and decodes tag data for a specific tag in a block. +func (p *part) readBlockTags(tagName string, bm *blockMetadata, elems *elements) error { + tagBlockInfo, exists := bm.tagsBlocks[tagName] + if !exists { + return fmt.Errorf("tag block info not found for tag: %s", tagName) + } + + // Get tag metadata reader + tmReader, tmExists := p.getTagMetadataReader(tagName) + if !tmExists { + return fmt.Errorf("tag metadata reader not found for tag: %s", tagName) + } + + // Get tag data reader + tdReader, tdExists := p.getTagDataReader(tagName) + if !tdExists { + return fmt.Errorf("tag data reader not found for tag: %s", tagName) + } + + // Read tag metadata + tmData := make([]byte, tagBlockInfo.size) + fs.MustReadData(tmReader, int64(tagBlockInfo.offset), tmData) + + tm, err := unmarshalTagMetadata(tmData) + if err != nil { + return fmt.Errorf("cannot unmarshal tag metadata: %w", err) + } + defer releaseTagMetadata(tm) + + // Read and decompress tag data + tdData := make([]byte, tm.dataBlock.size) + fs.MustReadData(tdReader, int64(tm.dataBlock.offset), tdData) + + decompressedData, err := zstd.Decompress(nil, tdData) + if err != nil { + return fmt.Errorf("cannot decompress tag data: %w", err) + } + + // Decode tag values + tagValues, err := DecodeTagValues(decompressedData, tm.valueType, int(bm.count)) + if err != nil { + return fmt.Errorf("cannot decode tag values: %w", err) + } + + // Add tag values to elements + for i, value := range tagValues { + if i >= len(elems.tags) { + break + } + if elems.tags[i] == nil { + elems.tags[i] = make([]tag, 0, 1) + } + elems.tags[i] = append(elems.tags[i], tag{ + name: tagName, + value: value, + valueType: tm.valueType, + indexed: tm.indexed, + }) + } + + return nil +} + // String returns a string representation of the part. func (p *part) String() string { if p.partMetadata != nil { @@ -426,10 +616,11 @@ func (mp *memPart) mustInitFromElements(es *elements) { if i == len(es.seriesIDs) || es.seriesIDs[i] != currentSeriesID { // Extract elements for current series seriesUserKeys := es.userKeys[blockStart:i] + seriesData := es.data[blockStart:i] seriesTags := es.tags[blockStart:i] // Write elements for this series - bw.MustWriteElements(currentSeriesID, seriesUserKeys, seriesTags) + bw.MustWriteElements(currentSeriesID, seriesUserKeys, seriesData, seriesTags) if i < len(es.seriesIDs) { currentSeriesID = es.seriesIDs[i] @@ -453,10 +644,10 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, partPath string) { fileSystem.MkdirPanicIfExist(partPath, storage.DirPerm) // Write core files - fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(partPath, metaFilename), storage.FilePerm) fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(partPath, primaryFilename), storage.FilePerm) fs.MustFlush(fileSystem, mp.data.Buf, filepath.Join(partPath, dataFilename), storage.FilePerm) fs.MustFlush(fileSystem, mp.keys.Buf, filepath.Join(partPath, keysFilename), storage.FilePerm) + fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(partPath, metaFilename), storage.FilePerm) // Write individual tag files for tagName, td := range mp.tagData { @@ -507,12 +698,16 @@ func openMemPart(mp *memPart) *part { *p.partMetadata = *mp.partMetadata } - // Block metadata is now handled via blockMetadataArray parameter + // Load primary block metadata from meta buffer + if len(mp.meta.Buf) > 0 { + p.primaryBlockMetadata = mustReadPrimaryBlockMetadata(nil, &mp.meta) + } // Open data files as readers from memory buffers p.primary = &mp.primary p.data = &mp.data p.keys = &mp.keys + p.meta = &mp.meta // Open individual tag files if they exist if mp.tagData != nil { diff --git a/banyand/internal/sidx/part_test.go b/banyand/internal/sidx/part_test.go index c430191e..71f62ef6 100644 --- a/banyand/internal/sidx/part_test.go +++ b/banyand/internal/sidx/part_test.go @@ -20,13 +20,16 @@ package sidx import ( "fmt" "path/filepath" + "sort" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) func TestExtractTagNameAndExtension(t *testing.T) { @@ -96,386 +99,6 @@ func TestExtractTagNameAndExtension(t *testing.T) { } } -func TestPartLifecycleManagement(t *testing.T) { - // Create test filesystem - testFS := fs.NewLocalFileSystem() - tempDir := t.TempDir() - - // Create test part metadata - pm := generatePartMetadata() - pm.ID = 12345 - pm.MinKey = 100 - pm.MaxKey = 999 - pm.BlocksCount = 2 - pm.CompressedSizeBytes = 1024 - pm.UncompressedSizeBytes = 2048 - pm.TotalCount = 50 - - // Marshal metadata to create test files - metaData, err := pm.marshal() - require.NoError(t, err) - - // Create valid primary block metadata - pbm := primaryBlockMetadata{ - seriesID: 1, - minKey: 100, - maxKey: 999, - dataBlock: dataBlock{ - offset: 0, - size: 1024, - }, - } - - // Marshal and compress primary block metadata - primaryData := pbm.marshal(nil) - compressedPrimaryData := zstd.Compress(nil, primaryData, 1) - - // Create required files - testFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data content"), - keysFilename: []byte("keys content"), - metaFilename: metaData, - } - - for fileName, content := range testFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(t, err) - } - - // Test part opening - part := mustOpenPart(tempDir, testFS) - require.NotNil(t, part) - defer part.close() - - // Verify part properties - assert.Equal(t, tempDir, part.path) - assert.Equal(t, testFS, part.fileSystem) - assert.NotNil(t, part.primary) - assert.NotNil(t, part.data) - assert.NotNil(t, part.keys) - - // Verify metadata was loaded - require.NotNil(t, part.partMetadata) - assert.Equal(t, uint64(12345), part.partMetadata.ID) - assert.Equal(t, int64(100), part.partMetadata.MinKey) - assert.Equal(t, int64(999), part.partMetadata.MaxKey) - assert.Equal(t, uint64(2), part.partMetadata.BlocksCount) - - // Verify String method - expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir) - assert.Equal(t, expectedString, part.String()) - - // Test accessors - assert.Equal(t, part.partMetadata, part.getPartMetadata()) - assert.Equal(t, tempDir, part.Path()) - - // Cleanup - releasePartMetadata(pm) -} - -func TestPartWithTagFiles(t *testing.T) { - // Create test filesystem - testFS := fs.NewLocalFileSystem() - tempDir := t.TempDir() - - // Create minimal required files first - pm := generatePartMetadata() - pm.ID = 67890 - pm.BlocksCount = 1 - metaData, err := pm.marshal() - require.NoError(t, err) - - // Create valid primary block metadata - pbm := primaryBlockMetadata{ - seriesID: 1, - minKey: 0, - maxKey: 100, - dataBlock: dataBlock{ - offset: 0, - size: 512, - }, - } - - // Marshal and compress primary block metadata - primaryData := pbm.marshal(nil) - compressedPrimaryData := zstd.Compress(nil, primaryData, 1) - - requiredFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data"), - keysFilename: []byte("keys"), - metaFilename: metaData, - } - - for fileName, content := range requiredFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(t, err) - } - - // Create tag files - tagFiles := map[string][]byte{ - "user_id.td": []byte("user id tag data"), - "user_id.tm": []byte("user id tag metadata"), - "user_id.tf": []byte("user id tag filter"), - "service_name.td": []byte("service name tag data"), - "service_name.tf": []byte("service name tag filter"), - "endpoint.tm": []byte("endpoint tag metadata"), - // Invalid files that should be ignored - "invalid": []byte("invalid tag file"), - "invalid.unknown": []byte("unknown extension"), - "regular_file.txt": []byte("regular file"), - } - - for fileName, content := range tagFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(t, err) - } - - // Open part - part := mustOpenPart(tempDir, testFS) - require.NotNil(t, part) - defer part.close() - - // Verify tag files were opened correctly - assert.Equal(t, 2, len(part.tagData)) // user_id, service_name - assert.Equal(t, 2, len(part.tagMetadata)) // user_id, endpoint - assert.Equal(t, 2, len(part.tagFilters)) // user_id, service_name - - // Test tag data readers - userIDData, exists := part.getTagDataReader("user_id") - assert.True(t, exists) - assert.NotNil(t, userIDData) - - serviceNameData, exists := part.getTagDataReader("service_name") - assert.True(t, exists) - assert.NotNil(t, serviceNameData) - - _, exists = part.getTagDataReader("endpoint") - assert.False(t, exists) // Only has metadata, not data - - // Test tag metadata readers - userIDMeta, exists := part.getTagMetadataReader("user_id") - assert.True(t, exists) - assert.NotNil(t, userIDMeta) - - endpointMeta, exists := part.getTagMetadataReader("endpoint") - assert.True(t, exists) - assert.NotNil(t, endpointMeta) - - _, exists = part.getTagMetadataReader("service_name") - assert.False(t, exists) // Only has data and filter, not metadata - - // Test tag filter readers - userIDFilter, exists := part.getTagFilterReader("user_id") - assert.True(t, exists) - assert.NotNil(t, userIDFilter) - - serviceNameFilter, exists := part.getTagFilterReader("service_name") - assert.True(t, exists) - assert.NotNil(t, serviceNameFilter) - - _, exists = part.getTagFilterReader("endpoint") - assert.False(t, exists) // Only has metadata, not filter - - // Test available tag names - tagNames := part.getAvailableTagNames() - assert.Equal(t, 3, len(tagNames)) - expectedTags := map[string]bool{ - "user_id": false, - "service_name": false, - "endpoint": false, - } - for _, tagName := range tagNames { - _, exists := expectedTags[tagName] - assert.True(t, exists, "unexpected tag name: %s", tagName) - expectedTags[tagName] = true - } - for tagName, found := range expectedTags { - assert.True(t, found, "missing expected tag name: %s", tagName) - } - - // Test hasTagFiles - assert.True(t, part.hasTagFiles("user_id")) // Has all three types - assert.True(t, part.hasTagFiles("service_name")) // Has data and filter - assert.True(t, part.hasTagFiles("endpoint")) // Has metadata only - assert.False(t, part.hasTagFiles("nonexistent")) // Doesn't exist - - // Cleanup - releasePartMetadata(pm) -} - -func TestPartErrorHandling(t *testing.T) { - testFS := fs.NewLocalFileSystem() - tempDir := t.TempDir() - - // Test 1: Missing required files should panic - t.Run("missing_files_panic", func(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Error("expected panic when opening part with missing files") - } - }() - mustOpenPart(tempDir, testFS) - }) - - // Test 2: Invalid metadata should panic - t.Run("invalid_metadata_panic", func(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Error("expected panic when loading invalid metadata") - } - }() - - // Create required files with invalid metadata - testFiles := map[string][]byte{ - primaryFilename: []byte("primary"), - dataFilename: []byte("data"), - keysFilename: []byte("keys"), - metaFilename: []byte("invalid json metadata"), - } - - for fileName, content := range testFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(t, err) - } - - mustOpenPart(tempDir, testFS) - }) -} - -func TestPartClosingBehavior(t *testing.T) { - testFS := fs.NewLocalFileSystem() - tempDir := t.TempDir() - - // Create test part - pm := generatePartMetadata() - pm.ID = 11111 - pm.BlocksCount = 1 - metaData, err := pm.marshal() - require.NoError(t, err) - - // Create valid primary block metadata - pbm := primaryBlockMetadata{ - seriesID: 1, - minKey: 0, - maxKey: 100, - dataBlock: dataBlock{ - offset: 0, - size: 256, - }, - } - - // Marshal and compress primary block metadata - primaryData := pbm.marshal(nil) - compressedPrimaryData := zstd.Compress(nil, primaryData, 1) - - testFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data"), - keysFilename: []byte("keys"), - metaFilename: metaData, - "test.td": []byte("tag data"), - "test.tm": []byte("tag metadata"), - "test.tf": []byte("tag filter"), - } - - for fileName, content := range testFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(t, err) - } - - // Open part - part := mustOpenPart(tempDir, testFS) - require.NotNil(t, part) - - // Verify it's properly opened - assert.NotNil(t, part.primary) - assert.NotNil(t, part.data) - assert.NotNil(t, part.keys) - assert.Equal(t, 1, len(part.tagData)) - assert.Equal(t, 1, len(part.tagMetadata)) - assert.Equal(t, 1, len(part.tagFilters)) - assert.NotNil(t, part.partMetadata) - - // Close the part - part.close() - - // Verify resources are cleaned up - // Note: We can't directly test that files are closed since fs.Reader - // doesn't expose that state, but we can verify that metadata is released - assert.Nil(t, part.partMetadata) - - // Test closing with defensive programming (nil check in close method) - // The close method should handle nil pointers gracefully - - // Cleanup - releasePartMetadata(pm) -} - -func TestPartMemoryManagement(t *testing.T) { - testFS := fs.NewLocalFileSystem() - tempDir := t.TempDir() - - // Create test part with block metadata - pm := generatePartMetadata() - pm.ID = 22222 - pm.BlocksCount = 3 // Test with multiple blocks - metaData, err := pm.marshal() - require.NoError(t, err) - - // Create valid primary block metadata - pbm := primaryBlockMetadata{ - seriesID: 1, - minKey: 0, - maxKey: 100, - dataBlock: dataBlock{ - offset: 0, - size: 256, - }, - } - - // Marshal and compress primary block metadata - primaryData := pbm.marshal(nil) - compressedPrimaryData := zstd.Compress(nil, primaryData, 1) - - testFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data"), - keysFilename: []byte("keys"), - metaFilename: metaData, - } - - for fileName, content := range testFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(t, err) - } - - // Open and immediately close multiple parts to test memory management - for i := 0; i < 10; i++ { - part := mustOpenPart(tempDir, testFS) - require.NotNil(t, part) - - // Verify part was created correctly - assert.NotNil(t, part.partMetadata) - - // Close immediately - part.close() - - // Verify cleanup - assert.Nil(t, part.partMetadata) - } - - // Cleanup - releasePartMetadata(pm) -} - func TestPartStringRepresentation(t *testing.T) { testFS := fs.NewLocalFileSystem() tempDir := t.TempDir() @@ -529,126 +152,379 @@ func TestPartStringRepresentation(t *testing.T) { releasePartMetadata(pm) } -// Benchmark tests for performance validation. -func BenchmarkPartOpen(b *testing.B) { - testFS := fs.NewLocalFileSystem() - tempDir := b.TempDir() - - // Setup test data - pm := generatePartMetadata() - pm.ID = 77777 - pm.BlocksCount = 5 - metaData, err := pm.marshal() - require.NoError(b, err) - - // Create valid primary block metadata - pbm := primaryBlockMetadata{ - seriesID: 1, - minKey: 0, - maxKey: 1000, - dataBlock: dataBlock{ - offset: 0, - size: 1024, +func TestMemPartInitialization(t *testing.T) { + // Create simple test elements + elems := createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, }, - } + }) + defer releaseElements(elems) - // Marshal and compress primary block metadata - primaryData := pbm.marshal(nil) - compressedPrimaryData := zstd.Compress(nil, primaryData, 1) + t.Run("memPart initialization", func(t *testing.T) { + mp := generateMemPart() + defer releaseMemPart(mp) - testFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data content for benchmark"), - keysFilename: []byte("keys content for benchmark"), - metaFilename: metaData, - } + // Check if memPart is properly initialized from elements + mp.mustInitFromElements(elems) - // Add multiple tag files - for i := 0; i < 10; i++ { - tagName := fmt.Sprintf("tag_%d", i) - testFiles[fmt.Sprintf("%s.td", tagName)] = []byte(fmt.Sprintf("tag data %d", i)) - testFiles[fmt.Sprintf("%s.tm", tagName)] = []byte(fmt.Sprintf("tag metadata %d", i)) - testFiles[fmt.Sprintf("%s.tf", tagName)] = []byte(fmt.Sprintf("tag filter %d", i)) - } + t.Logf("Primary buffer length: %d", len(mp.primary.Buf)) + t.Logf("Data buffer length: %d", len(mp.data.Buf)) + t.Logf("Keys buffer length: %d", len(mp.keys.Buf)) - for fileName, content := range testFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(b, err) - } + // Check partMetadata + if mp.partMetadata != nil { + t.Logf("Part metadata blocks count: %d", mp.partMetadata.BlocksCount) + t.Logf("Part metadata total count: %d", mp.partMetadata.TotalCount) + } else { + t.Log("Part metadata is nil") + } - b.ResetTimer() - b.ReportAllocs() + // Check tag metadata + for tagName, buf := range mp.tagMetadata { + t.Logf("Tag metadata for %s: %d bytes", tagName, len(buf.Buf)) + } - for i := 0; i < b.N; i++ { - part := mustOpenPart(tempDir, testFS) - part.close() - } + // Check tag data + for tagName, buf := range mp.tagData { + t.Logf("Tag data for %s: %d bytes", tagName, len(buf.Buf)) + } - // Cleanup - releasePartMetadata(pm) + // Verify primary buffer has some data + if len(mp.primary.Buf) > 0 { + t.Log("✓ Primary buffer contains data") + } else { + t.Log("⚠ Primary buffer is empty") + } + }) } -func BenchmarkPartTagAccess(b *testing.B) { +func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { + // Test case that flushes to disk first, then reads back + // This works around issues with in-memory primary metadata testFS := fs.NewLocalFileSystem() - tempDir := b.TempDir() - - // Setup test part with many tag files - pm := generatePartMetadata() - pm.ID = 88888 - pm.BlocksCount = 1 - metaData, err := pm.marshal() - require.NoError(b, err) + tempDir := t.TempDir() - // Create valid primary block metadata - pbm := primaryBlockMetadata{ - seriesID: 1, - minKey: 0, - maxKey: 100, - dataBlock: dataBlock{ - offset: 0, - size: 256, + tests := []struct { + name string + elements *elements + }{ + { + name: "single series with single element", + elements: createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, + }, + }), + }, + { + name: "single series with multiple elements", + elements: createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + { + name: "method", + value: []byte("POST"), + valueType: pbv1.ValueTypeStr, + indexed: false, + }, + }, + }, + { + seriesID: 1, + userKey: 101, + data: []byte("data2"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + { + name: "method", + value: []byte("GET"), + valueType: pbv1.ValueTypeStr, + indexed: false, + }, + }, + }, + }), + }, + { + name: "multiple series with multiple elements", + elements: createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + { + name: "duration", + value: []byte{0, 0, 0, 0, 0, 0, 0, 100}, // int64: 100 + valueType: pbv1.ValueTypeInt64, + indexed: false, + }, + }, + }, + { + seriesID: 1, + userKey: 101, + data: []byte("data2"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + { + name: "duration", + value: []byte{0, 0, 0, 0, 0, 0, 0, 150}, // int64: 150 + valueType: pbv1.ValueTypeInt64, + indexed: false, + }, + }, + }, + { + seriesID: 2, + userKey: 200, + data: []byte("data3"), + tags: []tag{ + { + name: "service", + value: []byte("payment-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + { + name: "duration", + value: []byte{0, 0, 0, 0, 0, 0, 0, 75}, // int64: 75 + valueType: pbv1.ValueTypeInt64, + indexed: false, + }, + }, + }, + }), + }, + { + name: "elements with missing tags", + elements: createTestElements([]testElement{ + { + seriesID: 1, + userKey: 100, + data: []byte("data1"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + }, + }, + { + seriesID: 1, + userKey: 101, + data: []byte("data2"), + tags: []tag{ + { + name: "service", + value: []byte("order-service"), + valueType: pbv1.ValueTypeStr, + indexed: true, + }, + { + name: "endpoint", + value: []byte("/api/orders"), + valueType: pbv1.ValueTypeStr, + indexed: false, + }, + }, + }, + }), }, } - // Marshal and compress primary block metadata - primaryData := pbm.marshal(nil) - compressedPrimaryData := zstd.Compress(nil, primaryData, 1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Step 1: Create memPart and initialize with elements + mp := generateMemPart() + defer releaseMemPart(mp) + + // Create a copy of elements for initialization to avoid modifying the original + elementsCopy := generateElements() + defer releaseElements(elementsCopy) + + // Deep copy the elements + elementsCopy.seriesIDs = append(elementsCopy.seriesIDs, tt.elements.seriesIDs...) + elementsCopy.userKeys = append(elementsCopy.userKeys, tt.elements.userKeys...) + elementsCopy.data = append(elementsCopy.data, tt.elements.data...) + elementsCopy.tags = append(elementsCopy.tags, tt.elements.tags...) + + // Initialize memPart from elements copy + mp.mustInitFromElements(elementsCopy) + + // Step 2: Flush memPart to disk + partDir := filepath.Join(tempDir, fmt.Sprintf("part_%s", tt.name)) + mp.mustFlush(testFS, partDir) + + // Step 3: Open the flushed part from disk + part := mustOpenPart(partDir, testFS) + defer part.close() + + + // Step 4: Read all elements back from part + resultElements, err := part.readAll() + require.NoError(t, err, "readAll should not return error") + + // Step 5: Verify the result + require.NotEmpty(t, resultElements, "should return at least one elements collection") + + // Combine all returned elements into a single collection for comparison + combined := generateElements() + defer releaseElements(combined) + + for _, elems := range resultElements { + combined.seriesIDs = append(combined.seriesIDs, elems.seriesIDs...) + combined.userKeys = append(combined.userKeys, elems.userKeys...) + combined.data = append(combined.data, elems.data...) + combined.tags = append(combined.tags, elems.tags...) + } - testFiles := map[string][]byte{ - primaryFilename: compressedPrimaryData, - dataFilename: []byte("data"), - keysFilename: []byte("keys"), - metaFilename: metaData, + // Create a clean copy of original elements for comparison (avoid sorting corruption) + originalCopy := generateElements() + defer releaseElements(originalCopy) + + originalCopy.seriesIDs = append(originalCopy.seriesIDs, tt.elements.seriesIDs...) + originalCopy.userKeys = append(originalCopy.userKeys, tt.elements.userKeys...) + originalCopy.data = append(originalCopy.data, tt.elements.data...) + originalCopy.tags = append(originalCopy.tags, tt.elements.tags...) + + // Sort both original copy and result for comparison + sort.Sort(originalCopy) + sort.Sort(combined) + + // Verify elements match + compareElements(t, originalCopy, combined) + + // Clean up individual result elements + for _, elems := range resultElements { + releaseElements(elems) + } + }) } +} - // Add many tag files - numTags := 100 - for i := 0; i < numTags; i++ { - tagName := fmt.Sprintf("tag_%d", i) - testFiles[fmt.Sprintf("%s.td", tagName)] = []byte(fmt.Sprintf("data %d", i)) - testFiles[fmt.Sprintf("%s.tm", tagName)] = []byte(fmt.Sprintf("meta %d", i)) - testFiles[fmt.Sprintf("%s.tf", tagName)] = []byte(fmt.Sprintf("filter %d", i)) - } +// testElement represents a single element for test creation +type testElement struct { + seriesID common.SeriesID + userKey int64 + data []byte + tags []tag +} - for fileName, content := range testFiles { - filePath := filepath.Join(tempDir, fileName) - _, err := testFS.Write(content, filePath, 0o644) - require.NoError(b, err) +// createTestElements creates an elements collection from test data +func createTestElements(testElems []testElement) *elements { + elems := generateElements() + + for _, te := range testElems { + elems.seriesIDs = append(elems.seriesIDs, te.seriesID) + elems.userKeys = append(elems.userKeys, te.userKey) + + // Copy data + dataCopy := make([]byte, len(te.data)) + copy(dataCopy, te.data) + elems.data = append(elems.data, dataCopy) + + // Copy tags + tagsCopy := make([]tag, len(te.tags)) + for i, t := range te.tags { + tagsCopy[i] = tag{ + name: t.name, + value: append([]byte(nil), t.value...), + valueType: t.valueType, + indexed: t.indexed, + } + } + elems.tags = append(elems.tags, tagsCopy) } - part := mustOpenPart(tempDir, testFS) - defer part.close() + return elems +} - b.ResetTimer() - b.ReportAllocs() +// compareElements compares two elements collections for equality +func compareElements(t *testing.T, expected, actual *elements) { + require.Equal(t, len(expected.seriesIDs), len(actual.seriesIDs), "seriesIDs length mismatch") + require.Equal(t, len(expected.userKeys), len(actual.userKeys), "userKeys length mismatch") + require.Equal(t, len(expected.data), len(actual.data), "data length mismatch") + require.Equal(t, len(expected.tags), len(actual.tags), "tags length mismatch") + + for i := 0; i < len(expected.seriesIDs); i++ { + assert.Equal(t, expected.seriesIDs[i], actual.seriesIDs[i], "seriesID mismatch at index %d", i) + assert.Equal(t, expected.userKeys[i], actual.userKeys[i], "userKey mismatch at index %d", i) + assert.Equal(t, expected.data[i], actual.data[i], "data mismatch at index %d", i) + + // Compare tags + require.Equal(t, len(expected.tags[i]), len(actual.tags[i]), "tags length mismatch at element %d", i) + + // Sort tags by name for consistent comparison + expectedTags := make([]tag, len(expected.tags[i])) + actualTags := make([]tag, len(actual.tags[i])) + copy(expectedTags, expected.tags[i]) + copy(actualTags, actual.tags[i]) + + sort.Slice(expectedTags, func(a, b int) bool { + return expectedTags[a].name < expectedTags[b].name + }) + sort.Slice(actualTags, func(a, b int) bool { + return actualTags[a].name < actualTags[b].name + }) - for i := 0; i < b.N; i++ { - tagName := fmt.Sprintf("tag_%d", i%numTags) - _, exists := part.getTagDataReader(tagName) - assert.True(b, exists) + for j := 0; j < len(expectedTags); j++ { + assert.Equal(t, expectedTags[j].name, actualTags[j].name, + "tag name mismatch at element %d, tag %d", i, j) + assert.Equal(t, expectedTags[j].value, actualTags[j].value, + "tag value mismatch at element %d, tag %d (%s)", i, j, expectedTags[j].name) + assert.Equal(t, expectedTags[j].valueType, actualTags[j].valueType, + "tag valueType mismatch at element %d, tag %d (%s)", i, j, expectedTags[j].name) + assert.Equal(t, expectedTags[j].indexed, actualTags[j].indexed, + "tag indexed mismatch at element %d, tag %d (%s)", i, j, expectedTags[j].name) + } } - - // Cleanup - releasePartMetadata(pm) }
