This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch perf/trace-vs-stream
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5e9fe68b5131c641c0226e8ed250d9b5f2452abb
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Sep 29 14:33:08 2025 +0000

    refactor: streamline block metadata handling and improve memory management
    
    - Removed unnecessary setter methods for series ID and key range in 
blockMetadata, directly accessing fields instead.
    - Updated blockWriter to marshal metadata more efficiently during block 
writing.
    - Enhanced error handling in block reading functions to ensure offsets 
match expected values.
    - Refactored span reading and writing functions to utilize a decoder for 
improved performance and clarity.
    - Introduced new methods for merging memory parts in the SIDX interface to 
enhance part management.
---
 banyand/internal/sidx/TODO.md                      | 301 ---------------------
 banyand/internal/sidx/block.go                     |  18 +-
 banyand/internal/sidx/block_writer.go              |   5 +-
 banyand/internal/sidx/interfaces.go                |   2 +
 banyand/internal/sidx/introducer.go                |   5 +-
 banyand/internal/sidx/introducer_test.go           |   6 +-
 banyand/internal/sidx/merge.go                     |  50 +++-
 banyand/internal/sidx/metadata.go                  |  12 -
 banyand/internal/sidx/metadata_test.go             |   5 +-
 banyand/internal/sidx/multi_sidx_query_test.go     |   4 +
 banyand/internal/sidx/part_iter.go                 |   3 -
 banyand/internal/sidx/part_wrapper.go              |  21 +-
 banyand/internal/sidx/part_wrapper_test.go         |   2 +-
 banyand/internal/sidx/query_result.go              |   4 +
 banyand/internal/sidx/sidx.go                      |   5 +-
 banyand/internal/sidx/snapshot.go                  |   3 +
 banyand/trace/block.go                             |  40 +--
 banyand/trace/block_test.go                        |   3 +-
 banyand/trace/merger.go                            |   2 +-
 banyand/trace/snapshot.go                          |   1 -
 test/stress/stream-vs-trace/data_generator.go      |  25 +-
 test/stress/stream-vs-trace/docker/.gitignore      |   7 +-
 test/stress/stream-vs-trace/docker/README.md       |   2 +-
 .../stream-vs-trace/docker/client_wrappers.go      |  24 +-
 .../stress/stream-vs-trace/docker/collect-stats.sh | 238 ++++++++++++++++
 test/stress/stream-vs-trace/docker/docker_test.go  |  17 +-
 .../stream-vs-trace/docker/example-stats-output.md | 129 +++++++++
 .../stream-vs-trace/docker/run-docker-test.sh      |  51 +++-
 .../stress/stream-vs-trace/docker/schema_client.go |  26 +-
 test/stress/stream-vs-trace/metrics.go             |   3 +-
 30 files changed, 581 insertions(+), 433 deletions(-)

diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md
deleted file mode 100644
index bf4e8c6f..00000000
--- a/banyand/internal/sidx/TODO.md
+++ /dev/null
@@ -1,301 +0,0 @@
-# SIDX Implementation TODO List
-
-This document tracks the implementation progress of the Secondary Index File 
System (sidx)
-
-## Implementation Progress Overview
-
-**Completed Phases (33 tasks)**: ✅
-- Phase 1: Core Data Structures (6 tasks)
-- Phase 2: Interface Definitions (5 tasks)
-- Phase 3: Mock Implementations (4 tasks)
-- Phase 4: Memory Management (4 tasks)
-- Phase 5: Snapshot Management (6 tasks)
-- Phase 6: Query Path (5 tasks) - All query components completed
-
-**Remaining Phases**:
-- [ ] **Phase 7**: Flush Operations (4 tasks)
-- [ ] **Phase 8**: Merge Operations (4 tasks)
-- [ ] **Phase 9**: Testing (4 tasks)
-
-**Total Tasks**: 40 (33 completed, 7 remaining)
-
----
-
----
-
-## Phase 6: Query Path (Following Stream Architecture)
-
-### 6.1 Part Iterator (`part_iter.go` or extend `part.go`)
-- [x] **Implementation Tasks**:
-  - [x] Create `partIter` struct for single part iteration
-  - [x] Implement `init(part, seriesIDs, minKey, maxKey, filter)`
-  - [x] Add `nextBlock() bool` method for block advancement
-  - [x] Create `curBlock` access and `error()` handling
-- [x] **Test Cases**:
-  - [x] Part iteration finds all matching blocks in correct order
-  - [x] Key range filtering works at block level
-  - [x] Error handling for corrupted parts
-  - [x] Performance meets single-part iteration targets
-
-### 6.2 Multi-Part Iterator (`iter.go` - like stream's `tstIter`)
-- [x] **Implementation Tasks**:
-  - [x] Create `iter` struct with `partIterHeap` for ordering
-  - [x] Implement `init(parts, seriesIDs, minKey, maxKey, filter)`
-  - [x] Add `nextBlock() bool` with heap-based merge logic
-  - [x] Create `Error()` method for aggregated error handling
-- [x] **Test Cases**:
-  - [x] Multi-part ordering maintained across parts
-  - [x] Heap operations preserve key ordering (ASC/DESC)
-  - [x] Iterator handles empty parts gracefully
-  - [x] Memory usage during multi-part iteration is controlled
-
-### 6.3 Block Scanner (`block_scanner.go` - like stream's block_scanner)
-- [x] **Implementation Tasks**:
-  - [x] Create `blockScanner` struct using `iter` for block access
-  - [x] Implement `scan(ctx, blockCh chan *blockScanResultBatch)`
-  - [x] Add batch processing with `blockScanResultBatch` pattern
-  - [x] Create element-level filtering and tag matching
-- [x] **Test Cases**:
-  - [x] Batch processing maintains correct element ordering
-  - [x] Memory quota management prevents OOM
-  - [x] Tag filtering accuracy with bloom filters
-  - [x] Worker coordination and error propagation
-
-### 6.4 Query Result (`query_result.go` - like stream's `tsResult`)
-- [x] **Implementation Tasks**:
-  - [x] Create `queryResult` struct implementing `QueryResult` interface
-  - [x] Implement `Pull() *QueryResponse` with worker pool pattern
-  - [x] Add `runBlockScanner(ctx)` for parallel processing
-  - [x] Create `Release()` for resource cleanup
-- [x] **Test Cases**:
-  - [x] Pull/Release pattern prevents resource leaks
-  - [x] Parallel workers maintain result ordering
-  - [x] Result merging produces correct final output
-  - [x] Performance scales with worker count
-
-### 6.5 SIDX Query Interface (extend `sidx.go`)
-- [x] **Implementation Tasks**:
-  - [x] Implement `Query(ctx context.Context, req QueryRequest) (QueryResult, 
error)`
-  - [x] Add query validation and snapshot acquisition
-  - [x] Create part selection by key range overlap
-  - [x] Integrate with existing sidx snapshot management
-- [x] **Test Cases**:
-  - [x] Query validation catches invalid requests
-  - [x] Part selection optimizes I/O by filtering non-overlapping parts
-  - [x] Integration with snapshot reference counting works correctly
-  - [x] End-to-end query performance meets targets
-
----
-
-## Phase 7: Flush Operations
-
-### 7.1 Flusher Interface (`flusher.go`)
-- [ ] Simple Flush() method for user control
-- [ ] Internal part selection logic
-- [ ] Error handling and retry mechanisms
-- [ ] **Test Cases**:
-  - [ ] Flush API works reliably
-  - [ ] State management during flush operations
-  - [ ] Error handling for flush failures
-  - [ ] Concurrent flush operations are handled safely
-
-### 7.2 Flush to Disk (`flusher.go`)
-- [ ] Create part directories with epoch names
-- [ ] Write all part files atomically
-- [ ] Implement crash recovery mechanisms
-- [ ] **Test Cases**:
-  - [ ] File creation follows naming conventions
-  - [ ] Atomic operations prevent partial writes
-  - [ ] Crash recovery restores consistent state
-  - [ ] Disk space management during flush
-
-### 7.3 Tag File Writing (`flusher.go`)
-- [ ] Write individual tag files (not families)
-- [ ] Generate bloom filters for indexed tags
-- [ ] Optimize file layout for query performance
-- [ ] **Test Cases**:
-  - [ ] Tag file integrity after write
-  - [ ] Bloom filter accuracy for lookups
-  - [ ] File format compatibility
-  - [ ] Performance of tag file generation
-
-### 7.4 Block Serialization (`flusher.go` + `block_writer.go`)
-- [ ] **Implementation Tasks**:
-  - [ ] Integrate block writer into flush operations
-  - [ ] Add primary.bin writing with block metadata
-  - [ ] Implement compression for block data
-  - [ ] Add file reference management
-- [ ] **Test Cases**:
-  - [ ] Block persistence maintains data integrity
-  - [ ] Compression reduces storage footprint
-  - [ ] Read-back verification after flush
-  - [ ] Performance of block serialization
-
----
-
-## Phase 8: Merge Operations
-
-### 8.1 Merger Interface (`merger.go`)
-- [ ] Simple Merge() method for user control
-- [ ] Internal merge strategy implementation
-- [ ] Resource management during merge
-- [ ] **Test Cases**:
-  - [ ] Merge API responds correctly
-  - [ ] Error handling for merge failures
-  - [ ] Resource usage during merge operations
-  - [ ] Concurrent merge safety
-
-### 8.2 Part Selection (`merger.go`)
-- [ ] Select parts by size/age criteria
-- [ ] Avoid merging recent parts
-- [ ] Optimize merge efficiency
-- [ ] **Test Cases**:
-  - [ ] Selection algorithm chooses optimal parts
-  - [ ] Edge cases (no parts to merge, single part)
-  - [ ] Selection criteria can be tuned
-  - [ ] Selection performance is acceptable
-
-### 8.3 Merged Part Writer (`merger.go`)
-- [ ] Combine parts maintaining key order
-- [ ] Deduplicate overlapping data
-- [ ] Generate merged part metadata
-- [ ] **Test Cases**:
-  - [ ] Merge correctness preserves all data
-  - [ ] Deduplication removes redundant entries
-  - [ ] Key ordering is maintained across parts
-  - [ ] Merged part metadata is accurate
-
-### 8.4 Block Merging (`merger.go` + `block.go`)
-- [ ] **Implementation Tasks**:
-  - [ ] Integrate block reader for loading blocks from parts
-  - [ ] Add block merging logic with ordering preservation
-  - [ ] Implement merged block creation
-  - [ ] Add memory-efficient merge processing
-- [ ] **Test Cases**:
-  - [ ] Block merge correctness across parts
-  - [ ] Ordering preservation during merge
-  - [ ] Memory efficiency during block merge
-  - [ ] Performance of block merge operations
-
----
-
-## Phase 9: Testing
-
-### 9.1 Unit Tests
-- [ ] **Test block.go**: Block creation, initialization, validation
-- [ ] Test all components individually
-- [ ] Achieve >90% code coverage
-- [ ] **Test Cases**:
-  - [ ] Component functionality works in isolation
-  - [ ] Edge cases are handled correctly
-  - [ ] Error conditions produce expected results
-  - [ ] Performance characteristics meet requirements
-
-### 9.2 Integration Tests
-- [ ] End-to-end workflow testing
-- [ ] Write-flush-merge-query cycles
-- [ ] Multi-component interaction verification
-- [ ] **Test Cases**:
-  - [ ] Full workflows complete successfully
-  - [ ] Component interactions work correctly
-  - [ ] Data consistency maintained throughout
-  - [ ] Performance acceptable under realistic loads
-
-### 9.3 Performance Benchmarks
-- [ ] **Benchmark block operations**: Creation, serialization, scanning
-- [ ] Throughput and latency measurements
-- [ ] Memory usage profiling
-- [ ] **Test Cases**:
-  - [ ] Performance regression detection
-  - [ ] Throughput meets design targets
-  - [ ] Latency remains within bounds
-  - [ ] Memory usage is reasonable
-
-### 9.4 Concurrency Tests
-- [ ] Race condition detection with race detector
-- [ ] Stress testing with concurrent operations
-- [ ] Deadlock prevention verification
-- [ ] **Test Cases**:
-  - [ ] Thread safety under concurrent load
-  - [ ] No deadlocks in normal operation
-  - [ ] Data races are eliminated
-  - [ ] System stability under stress
-
----
-
-## Remaining File Creation Checklist
-
-### Core Implementation Files (Remaining)
-- [ ] `block_reader.go` - Block deserialization 🔥
-- [ ] `block_scanner.go` - Block scanning for queries 🔥
-- [ ] `flusher.go` - Flush operations
-- [ ] `merger.go` - Merge operations
-- [ ] `query.go` - Query interface and execution
-
-### Test Files (Remaining)
-- [ ] `flusher_test.go` - Flusher testing
-- [ ] `merger_test.go` - Merger testing
-- [ ] `writer_test.go` - Writer testing
-- [ ] `query_test.go` - Query testing
-- [ ] `benchmark_test.go` - Performance benchmarks
-- [ ] `integration_test.go` - Integration tests
-- [ ] `concurrency_test.go` - Concurrency tests
-
-### Completed Files ✅
-**Core Implementation**: `sidx.go`, `element.go`, `tag.go`, `part.go`, 
`part_wrapper.go`, `mempart.go`, `block.go`, `block_writer.go`, `snapshot.go`, 
`introducer.go`, `metadata.go`, `options.go`
-
-**Test Files**: `sidx_test.go`, `element_test.go`, `tag_test.go`, 
`part_test.go`, `block_test.go`, `snapshot_test.go`, `introducer_test.go`
-
----
-
-## Success Criteria for Remaining Phases
-
-### Phase Completion Requirements
-- [ ] All tasks in phase completed
-- [ ] Unit tests pass with >90% coverage
-- [ ] Integration tests pass for phase functionality
-- [ ] No race conditions detected
-- [ ] Performance benchmarks meet targets
-- [ ] Clean linter output (`make lint`)
-- [ ] Documentation updated
-
-### Overall Success Criteria
-- [x] Phases 1-6 completed (33/40 tasks) ✅
-- [ ] Remaining 7 tasks completed
-- [ ] Full test suite passes
-- [ ] Performance meets design targets
-- [ ] Code review approval
-- [ ] Documentation complete
-- [ ] Ready for production use
-
----
-
-## Block.go Usage Summary 🔥
-
-The `block.go` file is central to the SIDX implementation and will be used in 
remaining phases:
-
-**Completed Usage** ✅:
-1. **Phase 1.4**: Block structure creation and initialization
-2. **Phase 4.2**: Block writer serialization  
-3. **Phase 4.4**: Block initialization from elements
-
-**Remaining Usage**:
-5. **Phase 6.3**: Block scanner reads blocks during queries
-6. **Phase 6.5**: Block reader deserializes blocks
-7. **Phase 7.4**: Serialize blocks to disk during flush
-8. **Phase 8.4**: Merge blocks from multiple parts
-9. **Phase 9.1**: Unit tests for block operations
-10. **Phase 9.3**: Performance benchmarks for block operations
-
----
-
-## Dependencies Between Remaining Tasks
-
-**Completed Foundation** ✅:
-- Phases 1-5 provide all core data structures, interfaces, memory management, 
and snapshot management
-
-**Remaining Dependencies**:
-- **Phase 6** can be developed independently (queries work with existing 
persisted data)
-- **Phase 7** requires completed snapshot management from Phase 5 ✅ 
-- **Phase 8** requires Phase 7 completion (flush needed for merge)
-- **Phase 9** requires completion of relevant phases for testing
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index 55dd22f3..bfd592b3 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -245,6 +245,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm 
*blockMetadata, ww *writers)
 
        // Write user keys to keys.bin and capture encoding information
        bm.keysEncodeType, bm.minKey = mustWriteKeysTo(&bm.keysBlock, 
b.userKeys, &ww.keysWriter)
+       bm.maxKey = b.userKeys[len(b.userKeys)-1]
 
        // Write data payloads to data.bin
        mustWriteDataTo(&bm.dataBlock, b.data, &ww.dataWriter)
@@ -516,6 +517,9 @@ func (b *block) mustSeqReadFrom(decoder 
*encoding.BytesBlockDecoder, sr *seqRead
 }
 
 func (b *block) readUserKeys(sr *seqReaders, bm *blockMetadata) error {
+       if bm.keysBlock.offset != sr.keys.bytesRead {
+               logger.Panicf("offset %d must be equal to bytesRead %d", 
bm.keysBlock.offset, sr.keys.bytesRead)
+       }
        bb := bigValuePool.Get()
        if bb == nil {
                bb = &bytes.Buffer{}
@@ -524,7 +528,7 @@ func (b *block) readUserKeys(sr *seqReaders, bm 
*blockMetadata) error {
                bb.Buf = bb.Buf[:0]
                bigValuePool.Put(bb)
        }()
-       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(bm.keysBlock.size))
+       bb.Buf = bytes.ResizeExact(bb.Buf[:0], int(bm.keysBlock.size))
        sr.keys.mustReadFull(bb.Buf)
        var err error
        b.userKeys, err = encoding.BytesToInt64List(b.userKeys[:0], bb.Buf, 
bm.keysEncodeType, bm.minKey, int(bm.count))
@@ -535,6 +539,9 @@ func (b *block) readUserKeys(sr *seqReaders, bm 
*blockMetadata) error {
 }
 
 func (b *block) readData(decoder *encoding.BytesBlockDecoder, sr *seqReaders, 
bm *blockMetadata) error {
+       if bm.dataBlock.offset != sr.data.bytesRead {
+               logger.Panicf("offset %d must be equal to bytesRead %d", 
bm.dataBlock.offset, sr.data.bytesRead)
+       }
        bb := bigValuePool.Get()
        if bb == nil {
                bb = &bytes.Buffer{}
@@ -543,7 +550,7 @@ func (b *block) readData(decoder 
*encoding.BytesBlockDecoder, sr *seqReaders, bm
                bb.Buf = bb.Buf[:0]
                bigValuePool.Put(bb)
        }()
-       bb.Buf = bytes.ResizeOver(bb.Buf, int(bm.dataBlock.size))
+       bb.Buf = bytes.ResizeExact(bb.Buf, int(bm.dataBlock.size))
        sr.data.mustReadFull(bb.Buf)
 
        var err error
@@ -567,6 +574,9 @@ func (b *block) readTagData(decoder 
*encoding.BytesBlockDecoder, sr *seqReaders,
 }
 
 func (b *block) readSingleTag(decoder *encoding.BytesBlockDecoder, sr 
*seqReaders, tagName string, tagBlock *dataBlock, count int) error {
+       if tagBlock.offset != sr.tagMetadata[tagName].bytesRead {
+               logger.Panicf("offset %d must be equal to bytesRead %d", 
tagBlock.offset, sr.tagMetadata[tagName].bytesRead)
+       }
        tmReader, tmExists := sr.tagMetadata[tagName]
        if !tmExists {
                return fmt.Errorf("tag metadata reader not found for tag %s", 
tagName)
@@ -585,7 +595,7 @@ func (b *block) readSingleTag(decoder 
*encoding.BytesBlockDecoder, sr *seqReader
                bigValuePool.Put(bb)
        }()
 
-       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(tagBlock.size))
+       bb.Buf = bytes.ResizeExact(bb.Buf[:0], int(tagBlock.size))
        tmReader.mustReadFull(bb.Buf)
        tm, err := unmarshalTagMetadata(bb.Buf)
        if err != nil {
@@ -593,7 +603,7 @@ func (b *block) readSingleTag(decoder 
*encoding.BytesBlockDecoder, sr *seqReader
        }
        defer releaseTagMetadata(tm)
 
-       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(tm.dataBlock.size))
+       bb.Buf = bytes.ResizeExact(bb.Buf[:0], int(tm.dataBlock.size))
        tdReader.mustReadFull(bb.Buf)
        td := generateTagData()
        td.name = tagName
diff --git a/banyand/internal/sidx/block_writer.go 
b/banyand/internal/sidx/block_writer.go
index 88aea021..75645b4f 100644
--- a/banyand/internal/sidx/block_writer.go
+++ b/banyand/internal/sidx/block_writer.go
@@ -329,8 +329,6 @@ func (bw *blockWriter) mustWriteBlock(sid common.SeriesID, 
b *block) {
        bw.totalBlocksCount++
 
        // Serialize block metadata
-       bm.setSeriesID(sid)
-       bm.setKeyRange(minKey, maxKey)
        bw.primaryBlockData = bm.marshal(bw.primaryBlockData)
 
        if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
@@ -343,8 +341,7 @@ func (bw *blockWriter) mustWriteBlock(sid common.SeriesID, 
b *block) {
 func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
        if len(data) > 0 {
                bw.primaryBlockMetadata.mustWriteBlock(data, bw.sidFirst, 
bw.minKey, bw.maxKey, &bw.writers)
-               bmData := bw.primaryBlockMetadata.marshal(bw.metaData[:0])
-               bw.metaData = append(bw.metaData, bmData...)
+               bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
        }
        bw.hasWrittenBlocks = false
        bw.minKey = 0
diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index 8b61c802..411c9e2a 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -53,6 +53,8 @@ type SIDX interface {
        Flush() error
        // Merge merges the specified parts into a new part.
        Merge(closeCh <-chan struct{}) error
+       // MergeMemPart merges the mem parts into a new part.
+       MergeMemParts(closeCh <-chan struct{}) error
        // PartsToSync returns the parts to sync.
        PartsToSync() []*part
        // StreamingParts returns the streaming parts.
diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
index 34e6a8ed..c87bd163 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -80,7 +80,7 @@ func releaseFlusherIntroduction(i *flusherIntroduction) {
 
 type mergerIntroduction struct {
        merged  map[uint64]struct{}
-       newPart *part
+       newPart *partWrapper
        applied chan struct{}
 }
 
@@ -229,8 +229,7 @@ func (s *sidx) introduceMerged(nextIntroduction 
*mergerIntroduction, epoch uint6
        nextSnp := cur.remove(epoch, nextIntroduction.merged)
 
        // Wrap the new part
-       pw := newPartWrapper(nil, nextIntroduction.newPart)
-       nextSnp.parts = append(nextSnp.parts, pw)
+       nextSnp.parts = append(nextSnp.parts, nextIntroduction.newPart)
 
        s.replaceSnapshot(nextSnp)
        s.persistSnapshot(nextSnp)
diff --git a/banyand/internal/sidx/introducer_test.go 
b/banyand/internal/sidx/introducer_test.go
index f601e995..a8651496 100644
--- a/banyand/internal/sidx/introducer_test.go
+++ b/banyand/internal/sidx/introducer_test.go
@@ -88,7 +88,7 @@ func TestIntroductionPooling(t *testing.T) {
                for i := 0; i < 10; i++ {
                        intro := generateMergerIntroduction()
                        intro.merged[uint64(i)] = struct{}{}
-                       intro.newPart = &part{}
+                       intro.newPart = &partWrapper{}
                        intro.applied = make(chan struct{})
                        intros = append(intros, intro)
                }
@@ -141,7 +141,7 @@ func TestIntroductionReset(t *testing.T) {
                // Set up merger introduction with data
                intro.merged[1] = struct{}{}
                intro.merged[2] = struct{}{}
-               intro.newPart = &part{}
+               intro.newPart = &partWrapper{}
                intro.applied = make(chan struct{})
 
                // Reset the merger introduction
@@ -331,7 +331,7 @@ func TestConcurrentPoolAccess(t *testing.T) {
                                for j := 0; j < operationsPerGoroutine; j++ {
                                        intro := generateMergerIntroduction()
                                        intro.merged[uint64(j)] = struct{}{}
-                                       intro.newPart = &part{}
+                                       intro.newPart = &partWrapper{}
                                        intro.applied = make(chan struct{})
                                        releaseMergerIntroduction(intro)
                                }
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index 4f348ae5..9acbdcd3 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -69,7 +69,55 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error {
        if err != nil {
                return err
        }
-       mergeIntro.newPart = newPart.p
+       mergeIntro.newPart = newPart
+
+       // Send to introducer loop
+       s.mergeCh <- mergeIntro
+
+       // Wait for merge to complete
+       <-mergeIntro.applied
+
+       return nil
+}
+
+func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error {
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return nil
+       }
+       defer snap.decRef()
+
+       // Create merge introduction
+       mergeIntro := generateMergerIntroduction()
+       defer releaseMergerIntroduction(mergeIntro)
+       mergeIntro.applied = make(chan struct{})
+
+       // Select parts to merge (all active non-memory parts)
+       var partsToMerge []*partWrapper
+       for _, pw := range snap.parts {
+               if pw.isActive() && pw.isMemPart() {
+                       partsToMerge = append(partsToMerge, pw)
+               }
+       }
+
+       if len(partsToMerge) < 2 {
+               return nil
+       }
+
+       // Mark parts for merging
+       for _, pw := range partsToMerge {
+               mergeIntro.merged[pw.ID()] = struct{}{}
+       }
+
+       // Generate new part ID using atomic increment
+       newPartID := atomic.AddUint64(&s.curPartID, 1)
+
+       // Create new merged part
+       newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, 
newPartID, s.root)
+       if err != nil {
+               return err
+       }
+       mergeIntro.newPart = newPart
 
        // Send to introducer loop
        s.mergeCh <- mergeIntro
diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
index b60197ae..ba4a6f5f 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -402,18 +402,6 @@ func (bm *blockMetadata) TagsBlocks() map[string]dataBlock 
{
        return bm.tagsBlocks
 }
 
-// setSeriesID sets the seriesID of the block.
-func (bm *blockMetadata) setSeriesID(seriesID common.SeriesID) {
-       bm.seriesID = seriesID
-}
-
-// setKeyRange sets the key range of the block.
-func (bm *blockMetadata) setKeyRange(minKey, maxKey int64) {
-       bm.minKey = minKey
-       bm.maxKey = maxKey
-}
-
-// setDataBlock sets the data block reference.
 func (bm *blockMetadata) setDataBlock(offset, size uint64) {
        bm.dataBlock = dataBlock{offset: offset, size: size}
 }
diff --git a/banyand/internal/sidx/metadata_test.go 
b/banyand/internal/sidx/metadata_test.go
index 3d25ef96..248bec42 100644
--- a/banyand/internal/sidx/metadata_test.go
+++ b/banyand/internal/sidx/metadata_test.go
@@ -587,10 +587,11 @@ func TestBlockMetadata_SetterMethods(t *testing.T) {
        defer releaseBlockMetadata(bm)
 
        // Test setter methods
-       bm.setSeriesID(common.SeriesID(456))
+       bm.seriesID = common.SeriesID(456)
        assert.Equal(t, common.SeriesID(456), bm.seriesID)
 
-       bm.setKeyRange(20, 200)
+       bm.minKey = 20
+       bm.maxKey = 200
        assert.Equal(t, int64(20), bm.minKey)
        assert.Equal(t, int64(200), bm.maxKey)
 
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go 
b/banyand/internal/sidx/multi_sidx_query_test.go
index 155c06bf..3249a50a 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -66,6 +66,10 @@ func (m *mockSIDX) Merge(_ <-chan struct{}) error {
        return nil
 }
 
+func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) error {
+       return nil
+}
+
 func (m *mockSIDX) PartsToSync() []*part {
        return nil
 }
diff --git a/banyand/internal/sidx/part_iter.go 
b/banyand/internal/sidx/part_iter.go
index 7b095848..d49e8611 100644
--- a/banyand/internal/sidx/part_iter.go
+++ b/banyand/internal/sidx/part_iter.go
@@ -268,7 +268,6 @@ type partMergeIter struct {
        compressedPrimaryBuf []byte
        primaryBuf           []byte
        block                blockPointer
-       partID               uint64
        primaryMetadataIdx   int
 }
 
@@ -277,7 +276,6 @@ func (pmi *partMergeIter) reset() {
        pmi.seqReaders.reset()
        pmi.primaryBlockMetadata = nil
        pmi.primaryMetadataIdx = 0
-       pmi.partID = 0
        pmi.primaryBuf = pmi.primaryBuf[:0]
        pmi.compressedPrimaryBuf = pmi.compressedPrimaryBuf[:0]
        pmi.block.reset()
@@ -287,7 +285,6 @@ func (pmi *partMergeIter) mustInitFromPart(p *part) {
        pmi.reset()
        pmi.seqReaders.init(p)
        pmi.primaryBlockMetadata = p.primaryBlockMetadata
-       pmi.partID = p.partMetadata.ID
 }
 
 func (pmi *partMergeIter) error() error {
diff --git a/banyand/internal/sidx/part_wrapper.go 
b/banyand/internal/sidx/part_wrapper.go
index 0ca41099..37199f07 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -54,22 +54,11 @@ func (s partWrapperState) String() string {
 // It enables safe concurrent access to parts while managing their lifecycle.
 // When the reference count reaches zero, the underlying part is cleaned up.
 type partWrapper struct {
-       // p is the underlying part. It can be nil for memory parts.
-       p *part
-
-       // mp is the memory part. It can be nil for file-based parts.
-       mp *memPart
-
-       // ref is the atomic reference counter.
-       // It starts at 1 when the wrapper is created.
-       ref int32
-
-       // state tracks the lifecycle state of the part.
-       // State transitions: active -> removing -> removed
-       state int32
-
-       // removable indicates if the part should be removed from disk when 
dereferenced.
-       // This is typically true for parts that have been merged or are no 
longer needed.
+       p         *part
+       mp        *memPart
+       snapshot  []uint64
+       ref       int32
+       state     int32
        removable atomic.Bool
 }
 
diff --git a/banyand/internal/sidx/part_wrapper_test.go 
b/banyand/internal/sidx/part_wrapper_test.go
index 8951ee7c..4ca6af43 100644
--- a/banyand/internal/sidx/part_wrapper_test.go
+++ b/banyand/internal/sidx/part_wrapper_test.go
@@ -206,7 +206,7 @@ func TestPartWrapper_NilPart(t *testing.T) {
        require.NotNil(t, pw)
 
        assert.Equal(t, int32(1), pw.refCount())
-       assert.Equal(t, uint64(0), pw.ID()) // Should return 0 for nil part
+       assert.Nil(t, pw.p)
        assert.True(t, pw.isActive())
 
        // Test acquire/release with nil part
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index 09692a2f..290c8938 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -70,6 +70,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p 
*part, bm *blockMetadata
        var err error
        tmpBlock.userKeys, err = 
encoding.BytesToInt64List(tmpBlock.userKeys[:0], bb.Buf, bm.keysEncodeType, 
bm.minKey, int(bm.count))
        if err != nil {
+               logger.Panicf("cannot decode user keys: %v", err)
                return false
        }
 
@@ -90,6 +91,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p 
*part, bm *blockMetadata
        decoder := &encoding.BytesBlockDecoder{}
        tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], bb2.Buf, 
bm.count)
        if err != nil {
+               logger.Panicf("cannot decode data payloads: %v", err)
                return false
        }
 
@@ -156,6 +158,7 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p 
*part, tagName string, tag
 
        tm, err := unmarshalTagMetadata(bb.Buf)
        if err != nil {
+               logger.Panicf("cannot unmarshal tag metadata: %v", err)
                return false
        }
        defer releaseTagMetadata(tm)
@@ -178,6 +181,7 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p 
*part, tagName string, tag
        // Decode tag values directly (no compression)
        td.values, err = internalencoding.DecodeTagValues(td.values[:0], 
decoder, bb2, tm.valueType, count)
        if err != nil {
+               logger.Panicf("cannot decode tag values: %v", err)
                return false
        }
 
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 352df8f3..14f3b8ec 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -453,8 +453,9 @@ func (s *sidx) Flush() error {
                }
                partPath := partPath(s.root, pw.ID())
                pw.mp.mustFlush(s.fileSystem, partPath)
-               p := mustOpenPart(partPath, s.fileSystem)
-               flushIntro.flushed[p.partMetadata.ID] = pw
+               newPW := newPartWrapper(nil, mustOpenPart(partPath, 
s.fileSystem))
+               newPW.p.partMetadata.ID = pw.ID()
+               flushIntro.flushed[newPW.ID()] = newPW
        }
 
        if len(flushIntro.flushed) == 0 {
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index e99182d6..5fdbf51e 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -277,6 +277,7 @@ func (s *snapshot) copyAllTo(epoch uint64) *snapshot {
        for _, pw := range result.parts {
                if pw != nil {
                        pw.acquire()
+                       pw.snapshot = append(pw.snapshot, epoch)
                }
        }
 
@@ -291,10 +292,12 @@ func (s *snapshot) merge(nextEpoch uint64, nextParts 
map[uint64]*partWrapper) *s
        for i := 0; i < len(s.parts); i++ {
                if n, ok := nextParts[s.parts[i].ID()]; ok {
                        result.parts = append(result.parts, n)
+                       n.snapshot = append(n.snapshot, nextEpoch)
                        continue
                }
                if s.parts[i].acquire() {
                        result.parts = append(result.parts, s.parts[i])
+                       s.parts[i].snapshot = append(s.parts[i].snapshot, 
nextEpoch)
                }
        }
        return &result
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 9ba12bee..22a47f80 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -223,7 +223,7 @@ func (b *block) spanSize() uint64 {
 func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm 
blockMetadata) {
        b.reset()
 
-       b.spans = mustReadSpansFrom(b.spans, bm.spans, int(bm.count), p.spans)
+       b.spans = mustReadSpansFrom(decoder, b.spans, bm.spans, int(bm.count), 
p.spans)
 
        b.resizeTags(len(bm.tagProjection.Names))
        for i, name := range bm.tagProjection.Names {
@@ -244,7 +244,7 @@ func (b *block) mustReadFrom(decoder 
*encoding.BytesBlockDecoder, p *part, bm bl
 func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, 
seqReaders *seqReaders, bm blockMetadata) {
        b.reset()
 
-       b.spans = mustSeqReadSpansFrom(b.spans, bm.spans, int(bm.count), 
&seqReaders.spans)
+       b.spans = mustSeqReadSpansFrom(decoder, b.spans, bm.spans, 
int(bm.count), &seqReaders.spans)
 
        b.resizeTags(len(bm.tags))
        keys := make([]string, 0, len(bm.tags))
@@ -275,36 +275,26 @@ func mustWriteSpansTo(sm *dataBlock, spans [][]byte, 
spanWriter *writer) {
        defer bigValuePool.Release(bb)
 
        sm.offset = spanWriter.bytesWritten
-       for _, span := range spans {
-               bb.Buf = encoding.VarUint64ToBytes(bb.Buf, uint64(len(span)))
-               bb.Buf = append(bb.Buf, span...)
-       }
+       bb.Buf = encoding.EncodeBytesBlock(bb.Buf, spans)
        sm.size = uint64(len(bb.Buf))
 
        spanWriter.MustWrite(bb.Buf)
 }
 
-func mustReadSpansFrom(spans [][]byte, sm *dataBlock, count int, reader 
fs.Reader) [][]byte {
+func mustReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte, sm 
*dataBlock, count int, reader fs.Reader) [][]byte {
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
        bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(sm.size))
        fs.MustReadData(reader, int64(sm.offset), bb.Buf)
-
-       src := bb.Buf
        spans = resizeSpans(spans, count)
-       var spanLen uint64
-       for i := 0; i < count; i++ {
-               src, spanLen = encoding.BytesToVarUint64(src)
-               if uint64(len(src)) < spanLen {
-                       logger.Panicf("insufficient data for span: need %d 
bytes, have %d", spanLen, len(src))
-               }
-               spans[i] = append(spans[i], src[:spanLen]...)
-               src = src[spanLen:]
+       spans, err := decoder.Decode(spans[:0], bb.Buf, uint64(count))
+       if err != nil {
+               logger.Panicf("cannot decode spans: %v", err)
        }
        return spans
 }
 
-func mustSeqReadSpansFrom(spans [][]byte, sm *dataBlock, count int, reader 
*seqReader) [][]byte {
+func mustSeqReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte, 
sm *dataBlock, count int, reader *seqReader) [][]byte {
        if sm.offset != reader.bytesRead {
                logger.Panicf("offset %d must be equal to bytesRead %d", 
sm.offset, reader.bytesRead)
        }
@@ -312,17 +302,9 @@ func mustSeqReadSpansFrom(spans [][]byte, sm *dataBlock, 
count int, reader *seqR
        defer bigValuePool.Release(bb)
        bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(sm.size))
        reader.mustReadFull(bb.Buf)
-
-       src := bb.Buf
-       spans = resizeSpans(spans, count)
-       var spanLen uint64
-       for i := 0; i < count; i++ {
-               src, spanLen = encoding.BytesToVarUint64(src)
-               if uint64(len(src)) < spanLen {
-                       logger.Panicf("insufficient data for span: need %d 
bytes, have %d", spanLen, len(src))
-               }
-               spans[i] = append(spans[i], src[:spanLen]...)
-               src = src[spanLen:]
+       spans, err := decoder.Decode(spans[:0], bb.Buf, uint64(count))
+       if err != nil {
+               logger.Panicf("cannot decode spans: %v", err)
        }
        return spans
 }
diff --git a/banyand/trace/block_test.go b/banyand/trace/block_test.go
index 398bc36f..7984a44e 100644
--- a/banyand/trace/block_test.go
+++ b/banyand/trace/block_test.go
@@ -225,6 +225,7 @@ func Test_mustWriteAndReadSpans(t *testing.T) {
                        spans: [][]byte{[]byte("span1"), []byte("span2"), 
[]byte("span3")},
                },
        }
+       decoder := &encoding.BytesBlockDecoder{}
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        defer func() {
@@ -238,7 +239,7 @@ func Test_mustWriteAndReadSpans(t *testing.T) {
                        w := new(writer)
                        w.init(b)
                        mustWriteSpansTo(sm, tt.spans, w)
-                       spans := mustReadSpansFrom(nil, sm, len(tt.spans), b)
+                       spans := mustReadSpansFrom(decoder, nil, sm, 
len(tt.spans), b)
                        if !reflect.DeepEqual(spans, tt.spans) {
                                t.Errorf("mustReadSpansFrom() spans = %v, want 
%v", spans, tt.spans)
                        }
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index a19400b8..148aa6fa 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -113,7 +113,7 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator 
snapshotCreator, part
                return nil, err
        }
        for _, sidxInstance := range tst.getAllSidx() {
-               if err := sidxInstance.Merge(closeCh); err != nil {
+               if err := sidxInstance.MergeMemParts(closeCh); err != nil {
                        tst.l.Warn().Err(err).Msg("sidx merge failed")
                        return nil, err
                }
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 6f873ffe..2019335a 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -128,7 +128,6 @@ func (s *snapshot) remove(nextEpoch uint64, merged 
map[partHandle]struct{}) snap
                s.parts[i].removable.Store(true)
                removedCount++
        }
-       logger.Infof("removed %d parts, merged %d parts", removedCount, 
len(merged))
        return result
 }
 
diff --git a/test/stress/stream-vs-trace/data_generator.go 
b/test/stress/stream-vs-trace/data_generator.go
index 09081e5d..6a39cd86 100644
--- a/test/stress/stream-vs-trace/data_generator.go
+++ b/test/stress/stream-vs-trace/data_generator.go
@@ -20,7 +20,9 @@ package streamvstrace
 import (
        "context"
        "crypto/rand"
+       "errors"
        "fmt"
+       "io"
        "math"
        "math/big"
        "strings"
@@ -744,11 +746,15 @@ func (c *StreamClient) WriteStreamData(ctx 
context.Context, spans []*SpanData) e
        }
 
        // Wait for final response
-       _, err = stream.Recv()
-       if err != nil && err.Error() != "EOF" {
-               return fmt.Errorf("failed to receive final response: %w", err)
+       for i := 0; i < len(spans); i++ {
+               _, err := stream.Recv()
+               if errors.Is(err, io.EOF) {
+                       break
+               }
+               if err != nil {
+                       return fmt.Errorf("failed to receive final response: 
%w", err)
+               }
        }
-
        return nil
 }
 
@@ -774,9 +780,14 @@ func (c *TraceClient) WriteTraceData(ctx context.Context, 
spans []*SpanData) err
        }
 
        // Wait for final response
-       _, err = stream.Recv()
-       if err != nil && err.Error() != "EOF" {
-               return fmt.Errorf("failed to receive final response: %w", err)
+       for i := 0; i < len(spans); i++ {
+               _, err := stream.Recv()
+               if errors.Is(err, io.EOF) {
+                       break
+               }
+               if err != nil {
+                       return fmt.Errorf("failed to receive final response: 
%w", err)
+               }
        }
 
        return nil
diff --git a/test/stress/stream-vs-trace/docker/.gitignore 
b/test/stress/stream-vs-trace/docker/.gitignore
index fadb27cc..d21a26c8 100644
--- a/test/stress/stream-vs-trace/docker/.gitignore
+++ b/test/stress/stream-vs-trace/docker/.gitignore
@@ -11,4 +11,9 @@ trace-data/
 coverage.html
 
 # Binary files
-*.test
\ No newline at end of file
+*.test
+
+# Stats files
+container-stats.json
+performance-summary.txt
+stats_pid
diff --git a/test/stress/stream-vs-trace/docker/README.md 
b/test/stress/stream-vs-trace/docker/README.md
index 3e528ecb..049ebd9e 100644
--- a/test/stress/stream-vs-trace/docker/README.md
+++ b/test/stress/stream-vs-trace/docker/README.md
@@ -54,7 +54,7 @@ Each container is configured with:
 ## Prerequisites
 
 - Docker and Docker Compose
-- Go 1.23 or later
+- Go 1.25 or later
 - GNU Make
 
 ## Usage
diff --git a/test/stress/stream-vs-trace/docker/client_wrappers.go 
b/test/stress/stream-vs-trace/docker/client_wrappers.go
index 8e786c49..18f6e4a9 100644
--- a/test/stress/stream-vs-trace/docker/client_wrappers.go
+++ b/test/stress/stream-vs-trace/docker/client_wrappers.go
@@ -19,34 +19,34 @@ package docker
 
 import (
        "google.golang.org/grpc"
-       
+
        streamvstrace 
"github.com/apache/skywalking-banyandb/test/stress/stream-vs-trace"
 )
 
-// DockerStreamClient wraps StreamClient with connection exposed
-type DockerStreamClient struct {
+// StreamClient wraps StreamClient with connection exposed.
+type StreamClient struct {
        *streamvstrace.StreamClient
        conn *grpc.ClientConn
 }
 
-// NewDockerStreamClient creates a new DockerStreamClient instance
-func NewDockerStreamClient(conn *grpc.ClientConn) *DockerStreamClient {
-       return &DockerStreamClient{
+// NewStreamClient creates a new DockerStreamClient instance.
+func NewStreamClient(conn *grpc.ClientConn) *StreamClient {
+       return &StreamClient{
                StreamClient: streamvstrace.NewStreamClient(conn),
                conn:         conn,
        }
 }
 
-// DockerTraceClient wraps TraceClient with connection exposed
-type DockerTraceClient struct {
+// TraceClient wraps TraceClient with connection exposed.
+type TraceClient struct {
        *streamvstrace.TraceClient
        conn *grpc.ClientConn
 }
 
-// NewDockerTraceClient creates a new DockerTraceClient instance
-func NewDockerTraceClient(conn *grpc.ClientConn) *DockerTraceClient {
-       return &DockerTraceClient{
+// NewTraceClient creates a new DockerTraceClient instance.
+func NewTraceClient(conn *grpc.ClientConn) *TraceClient {
+       return &TraceClient{
                TraceClient: streamvstrace.NewTraceClient(conn),
                conn:        conn,
        }
-}
\ No newline at end of file
+}
diff --git a/test/stress/stream-vs-trace/docker/collect-stats.sh 
b/test/stress/stream-vs-trace/docker/collect-stats.sh
new file mode 100755
index 00000000..018caec2
--- /dev/null
+++ b/test/stress/stream-vs-trace/docker/collect-stats.sh
@@ -0,0 +1,238 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file to
+# you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Container stats collection script
+# Collects performance metrics during test execution
+
+set -e
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+STATS_FILE="$SCRIPT_DIR/container-stats.json"
+SUMMARY_FILE="$SCRIPT_DIR/performance-summary.txt"
+
+# Colors for output
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+NC='\033[0m' # No Color
+
+# Container names
+STREAM_CONTAINER="banyandb-stream"
+TRACE_CONTAINER="banyandb-trace"
+
+# Function to check if containers exist
+check_containers() {
+    if ! docker ps --format "table {{.Names}}" | grep -q "$STREAM_CONTAINER"; 
then
+        echo -e "${YELLOW}Warning: Stream container ($STREAM_CONTAINER) not 
found${NC}"
+        return 1
+    fi
+    
+    if ! docker ps --format "table {{.Names}}" | grep -q "$TRACE_CONTAINER"; 
then
+        echo -e "${YELLOW}Warning: Trace container ($TRACE_CONTAINER) not 
found${NC}"
+        return 1
+    fi
+    
+    return 0
+}
+
+# Function to collect stats for a single container
+collect_container_stats() {
+    local container_name=$1
+    local timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
+    
+    # Get container stats in JSON format
+    local stats=$(docker stats --no-stream --format "json" "$container_name" 
2>/dev/null || echo "{}")
+    
+    # Add timestamp and container name
+    echo "$stats" | jq --arg timestamp "$timestamp" --arg container 
"$container_name" \
+        '. + {timestamp: $timestamp, container: $container}' 2>/dev/null || 
echo "{}"
+}
+
+# Function to start continuous stats collection
+start_stats_collection() {
+    echo -e "${GREEN}Starting container stats collection...${NC}"
+    
+    # Initialize stats file
+    echo "[]" > "$STATS_FILE"
+    
+    # Check if containers exist
+    if ! check_containers; then
+        echo -e "${YELLOW}Some containers not found, stats collection may be 
incomplete${NC}"
+    fi
+    
+    # Start background collection process
+    (
+        while true; do
+            # Collect stats for both containers
+            local stream_stats=$(collect_container_stats "$STREAM_CONTAINER")
+            local trace_stats=$(collect_container_stats "$TRACE_CONTAINER")
+            
+            # Add to stats file
+            if [ "$stream_stats" != "{}" ] || [ "$trace_stats" != "{}" ]; then
+                local temp_file=$(mktemp)
+                jq --argjson stream "$stream_stats" --argjson trace 
"$trace_stats" \
+                    '. + [$stream, $trace]' "$STATS_FILE" > "$temp_file" 
2>/dev/null || echo "[]" > "$temp_file"
+                mv "$temp_file" "$STATS_FILE"
+            fi
+            
+            sleep 5  # Collect every 5 seconds
+        done
+    ) &
+    
+    echo $! > "$SCRIPT_DIR/stats_pid"
+    echo -e "${GREEN}Stats collection started (PID: $(cat 
$SCRIPT_DIR/stats_pid))${NC}"
+}
+
+# Function to stop stats collection
+stop_stats_collection() {
+    if [ -f "$SCRIPT_DIR/stats_pid" ]; then
+        local pid=$(cat "$SCRIPT_DIR/stats_pid")
+        if kill -0 "$pid" 2>/dev/null; then
+            kill "$pid"
+            echo -e "${GREEN}Stats collection stopped${NC}"
+        fi
+        rm -f "$SCRIPT_DIR/stats_pid"
+    fi
+}
+
+# Function to generate performance summary
+generate_summary() {
+    echo -e "${GREEN}Generating performance summary...${NC}"
+    
+    if [ ! -f "$STATS_FILE" ] || [ ! -s "$STATS_FILE" ]; then
+        echo -e "${YELLOW}No stats data found${NC}"
+        return 1
+    fi
+    
+    # Create summary file
+    cat > "$SUMMARY_FILE" << EOF
+========================================
+Container Performance Summary
+========================================
+Generated: $(date)
+Test Duration: $(jq -r '.[0].timestamp + " to " + .[-1].timestamp' 
"$STATS_FILE" 2>/dev/null || echo "Unknown")
+
+EOF
+
+    # Process stats for each container
+    for container in "$STREAM_CONTAINER" "$TRACE_CONTAINER"; do
+        echo "Processing stats for $container..."
+        
+        # Filter stats for this container
+        local container_stats=$(jq --arg container "$container" '.[] | 
select(.container == $container)' "$STATS_FILE")
+        
+        if [ -z "$container_stats" ] || [ "$container_stats" = "" ]; then
+            echo -e "${YELLOW}No stats found for $container${NC}"
+            continue
+        fi
+        
+        # Calculate metrics
+        local cpu_avg=$(echo "$container_stats" | jq -r '.CPUPerc' | sed 
's/%//' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 
0}')
+        local cpu_max=$(echo "$container_stats" | jq -r '.CPUPerc' | sed 
's/%//' | awk 'BEGIN{max=0} {if($1>max) max=$1} END {print max}')
+        local mem_avg=$(echo "$container_stats" | jq -r '.MemUsage' | awk 
-F'/' '{print $1}' | sed 's/[^0-9.]//g' | awk '{sum+=$1; count++} END 
{if(count>0) print sum/count; else print 0}')
+        local mem_max=$(echo "$container_stats" | jq -r '.MemUsage' | awk 
-F'/' '{print $1}' | sed 's/[^0-9.]//g' | awk 'BEGIN{max=0} {if($1>max) max=$1} 
END {print max}')
+        local mem_limit=$(echo "$container_stats" | jq -r '.MemUsage' | awk 
-F'/' '{print $2}' | head -1 | sed 's/[^0-9.]//g')
+        local net_rx=$(echo "$container_stats" | jq -r '.NetIO' | awk -F'/' 
'{print $1}' | head -1)
+        local net_tx=$(echo "$container_stats" | jq -r '.NetIO' | awk -F'/' 
'{print $2}' | head -1)
+        local block_read=$(echo "$container_stats" | jq -r '.BlockIO' | awk 
-F'/' '{print $1}' | head -1)
+        local block_write=$(echo "$container_stats" | jq -r '.BlockIO' | awk 
-F'/' '{print $2}' | head -1)
+        local sample_count=$(echo "$container_stats" | jq -s 'length')
+        
+        # Add to summary
+        cat >> "$SUMMARY_FILE" << EOF
+
+----------------------------------------
+$container Performance Metrics
+----------------------------------------
+Sample Count: $sample_count
+
+CPU Usage:
+  Average: ${cpu_avg}%
+  Maximum: ${cpu_max}%
+
+Memory Usage:
+  Average: ${mem_avg}MB
+  Maximum: ${mem_max}MB
+  Limit: ${mem_limit}MB
+  Usage %: $(echo "scale=2; $mem_avg * 100 / $mem_limit" | bc 2>/dev/null || 
echo "N/A")%
+
+Network I/O:
+  Received: $net_rx
+  Transmitted: $net_tx
+
+Block I/O:
+  Read: $block_read
+  Write: $block_write
+
+EOF
+    done
+    
+    # Add comparison section
+    cat >> "$SUMMARY_FILE" << EOF
+
+----------------------------------------
+Performance Comparison
+----------------------------------------
+EOF
+
+    # Compare CPU usage
+    local stream_cpu_avg=$(jq --arg container "$STREAM_CONTAINER" '.[] | 
select(.container == $container) | .CPUPerc' "$STATS_FILE" | sed 's/%//' | awk 
'{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}')
+    local trace_cpu_avg=$(jq --arg container "$TRACE_CONTAINER" '.[] | 
select(.container == $container) | .CPUPerc' "$STATS_FILE" | sed 's/%//' | awk 
'{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}')
+    
+    if [ "$stream_cpu_avg" != "0" ] && [ "$trace_cpu_avg" != "0" ]; then
+        local cpu_ratio=$(echo "scale=2; $stream_cpu_avg / $trace_cpu_avg" | 
bc 2>/dev/null || echo "N/A")
+        echo "CPU Usage Ratio (Stream/Trace): $cpu_ratio" >> "$SUMMARY_FILE"
+    fi
+    
+    # Compare Memory usage
+    local stream_mem_avg=$(jq --arg container "$STREAM_CONTAINER" '.[] | 
select(.container == $container) | .MemUsage' "$STATS_FILE" | awk -F'/' '{print 
$1}' | sed 's/[^0-9.]//g' | awk '{sum+=$1; count++} END {if(count>0) print 
sum/count; else print 0}')
+    local trace_mem_avg=$(jq --arg container "$TRACE_CONTAINER" '.[] | 
select(.container == $container) | .MemUsage' "$STATS_FILE" | awk -F'/' '{print 
$1}' | sed 's/[^0-9.]//g' | awk '{sum+=$1; count++} END {if(count>0) print 
sum/count; else print 0}')
+    
+    if [ "$stream_mem_avg" != "0" ] && [ "$trace_mem_avg" != "0" ]; then
+        local mem_ratio=$(echo "scale=2; $stream_mem_avg / $trace_mem_avg" | 
bc 2>/dev/null || echo "N/A")
+        echo "Memory Usage Ratio (Stream/Trace): $mem_ratio" >> "$SUMMARY_FILE"
+    fi
+    
+    echo -e "${GREEN}Performance summary saved to: $SUMMARY_FILE${NC}"
+    echo -e "${GREEN}Raw stats data saved to: $STATS_FILE${NC}"
+}
+
+# Function to cleanup
+cleanup() {
+    stop_stats_collection
+    rm -f "$SCRIPT_DIR/stats_pid"
+}
+
+# Main execution
+case "${1:-start}" in
+    start)
+        start_stats_collection
+        ;;
+    stop)
+        stop_stats_collection
+        ;;
+    summary)
+        generate_summary
+        ;;
+    cleanup)
+        cleanup
+        ;;
+    *)
+        echo "Usage: $0 {start|stop|summary|cleanup}"
+        exit 1
+        ;;
+esac
diff --git a/test/stress/stream-vs-trace/docker/docker_test.go 
b/test/stress/stream-vs-trace/docker/docker_test.go
index f5a170ce..80ba6c1b 100644
--- a/test/stress/stream-vs-trace/docker/docker_test.go
+++ b/test/stress/stream-vs-trace/docker/docker_test.go
@@ -21,7 +21,6 @@ package docker
 import (
        "context"
        "fmt"
-       "os"
        "testing"
        "time"
 
@@ -38,9 +37,6 @@ import (
 )
 
 func TestStreamVsTraceDocker(t *testing.T) {
-       if os.Getenv("DOCKER_TEST") != "true" {
-               t.Skip("Skipping Docker test. Set DOCKER_TEST=true to run.")
-       }
        gomega.RegisterFailHandler(g.Fail)
        g.RunSpecs(t, "Stream vs Trace Performance Docker Suite", 
g.Label("docker", "performance", "slow"))
 }
@@ -55,8 +51,8 @@ var _ = g.Describe("Stream vs Trace Performance Docker", 
func() {
 
        g.It("should run performance comparison using Docker containers", 
func() {
                // Define connection addresses for the two containers
-               streamAddr := "localhost:17912"  // Stream container gRPC port
-               traceAddr := "localhost:27912"   // Trace container gRPC port
+               streamAddr := "localhost:17912" // Stream container gRPC port
+               traceAddr := "localhost:27912"  // Trace container gRPC port
 
                // Wait for both containers to be ready
                g.By("Waiting for Stream container to be ready")
@@ -77,8 +73,8 @@ var _ = g.Describe("Stream vs Trace Performance Docker", 
func() {
                defer traceConn.Close()
 
                // Create Docker clients with exposed connections
-               streamClient := NewDockerStreamClient(streamConn)
-               traceClient := NewDockerTraceClient(traceConn)
+               streamClient := NewStreamClient(streamConn)
+               traceClient := NewTraceClient(traceConn)
 
                // Create context for operations
                ctx := context.Background()
@@ -128,8 +124,7 @@ var _ = g.Describe("Stream vs Trace Performance Docker", 
func() {
        })
 })
 
-// loadSchemasToContainer loads the required schemas into both containers
-func loadSchemasToContainer(ctx context.Context, streamClient 
*DockerStreamClient, traceClient *DockerTraceClient) error {
+func loadSchemasToContainer(ctx context.Context, streamClient *StreamClient, 
traceClient *TraceClient) error {
        // Create schema clients for both connections
        streamSchemaClient := NewSchemaClient(streamClient.conn)
        traceSchemaClient := NewSchemaClient(traceClient.conn)
@@ -145,4 +140,4 @@ func loadSchemasToContainer(ctx context.Context, 
streamClient *DockerStreamClien
        }
 
        return nil
-}
\ No newline at end of file
+}
diff --git a/test/stress/stream-vs-trace/docker/example-stats-output.md 
b/test/stress/stream-vs-trace/docker/example-stats-output.md
new file mode 100644
index 00000000..16aa8f2f
--- /dev/null
+++ b/test/stress/stream-vs-trace/docker/example-stats-output.md
@@ -0,0 +1,129 @@
+# Container Performance Monitoring Example
+
+This document shows an example of the performance data collected during the 
Stream vs Trace Docker test.
+
+## Generated Files
+
+After running the test, the following files are generated:
+
+1. **`container-stats.json`** - Raw performance data in JSON format
+2. **`performance-summary.txt`** - Human-readable performance summary
+
+## Example Performance Summary
+
+```
+========================================
+Container Performance Summary
+========================================
+Generated: 2024-01-15 10:30:45
+Test Duration: 2024-01-15T10:25:30Z to 2024-01-15T10:30:45Z
+
+----------------------------------------
+banyandb-stream Performance Metrics
+----------------------------------------
+Sample Count: 60
+
+CPU Usage:
+  Average: 45.2%
+  Maximum: 78.5%
+
+Memory Usage:
+  Average: 1250.5MB
+  Maximum: 1450.2MB
+  Limit: 4096MB
+  Usage %: 30.5%
+
+Network I/O:
+  Received: 125.4MB
+  Transmitted: 89.7MB
+
+Block I/O:
+  Read: 45.2MB
+  Write: 12.8MB
+
+----------------------------------------
+banyandb-trace Performance Metrics
+----------------------------------------
+Sample Count: 60
+
+CPU Usage:
+  Average: 52.8%
+  Maximum: 85.2%
+
+Memory Usage:
+  Average: 1380.3MB
+  Maximum: 1620.1MB
+  Limit: 4096MB
+  Usage %: 33.7%
+
+Network I/O:
+  Received: 98.7MB
+  Transmitted: 76.3MB
+
+Block I/O:
+  Read: 38.9MB
+  Write: 15.2MB
+
+----------------------------------------
+Performance Comparison
+----------------------------------------
+CPU Usage Ratio (Stream/Trace): 0.86
+Memory Usage Ratio (Stream/Trace): 0.91
+```
+
+## Data Points Collected
+
+The monitoring system collects the following performance metrics every 5 
seconds:
+
+### CPU Metrics
+- **Average CPU Usage**: Mean CPU utilization percentage
+- **Maximum CPU Usage**: Peak CPU utilization during test
+- **CPU Usage Ratio**: Comparison between Stream and Trace containers
+
+### Memory Metrics
+- **Average Memory Usage**: Mean memory consumption in MB
+- **Maximum Memory Usage**: Peak memory consumption during test
+- **Memory Limit**: Container memory limit
+- **Memory Usage Percentage**: Percentage of limit used
+- **Memory Usage Ratio**: Comparison between Stream and Trace containers
+
+### Network I/O Metrics
+- **Network Received**: Total data received from network
+- **Network Transmitted**: Total data transmitted to network
+
+### Block I/O Metrics
+- **Block Read**: Total data read from disk
+- **Block Write**: Total data written to disk
+
+### Sample Information
+- **Sample Count**: Number of data points collected
+- **Test Duration**: Start and end timestamps of monitoring
+
+## Usage
+
+To run the test with performance monitoring:
+
+```bash
+# Run complete test with monitoring
+./run-docker-test.sh all
+
+# Or run just the test
+./run-docker-test.sh test
+
+# View performance summary from last test
+./run-docker-test.sh summary
+
+# View current container stats
+./run-docker-test.sh stats
+```
+
+## Analysis
+
+The performance summary provides insights into:
+
+1. **Resource Efficiency**: Which container (Stream vs Trace) uses resources 
more efficiently
+2. **Performance Characteristics**: CPU and memory usage patterns during 
different test phases
+3. **I/O Patterns**: Network and disk I/O behavior differences
+4. **Scalability**: How resource usage scales with workload
+
+This data helps in understanding the performance characteristics of the Stream 
and Trace models under load.
diff --git a/test/stress/stream-vs-trace/docker/run-docker-test.sh 
b/test/stress/stream-vs-trace/docker/run-docker-test.sh
index b4596668..6394e6d2 100755
--- a/test/stress/stream-vs-trace/docker/run-docker-test.sh
+++ b/test/stress/stream-vs-trace/docker/run-docker-test.sh
@@ -51,6 +51,7 @@ show_usage() {
     echo "  logs      Show container logs"
     echo "  ps        Show container status"
     echo "  stats     Show container resource usage"
+    echo "  summary   Show performance summary from last test"
     echo "  all       Run complete test (build, up, test, down)"
     echo "  help      Show this help message"
     echo ""
@@ -96,21 +97,42 @@ do_test() {
     # Set environment variable to enable Docker test
     export DOCKER_TEST=true
     
-    # Run the test from the docker directory to ensure relative paths work
+    # Start container stats collection
+    echo -e "${GREEN}Starting container performance monitoring...${NC}"
     cd "$SCRIPT_DIR"
+    ./collect-stats.sh start
+    
+    # Run the test from the docker directory to ensure relative paths work
     go test -v -timeout 30m ./... -run TestStreamVsTraceDocker
     
+    # Stop stats collection and generate summary
+    echo -e "${GREEN}Stopping performance monitoring...${NC}"
+    ./collect-stats.sh stop
+    ./collect-stats.sh summary
+    
     echo -e "${GREEN}Test completed successfully!${NC}"
     
-    # Optional: Show container resource usage
-    echo -e "\n${YELLOW}Container Resource Usage:${NC}"
+    # Show final container resource usage
+    echo -e "\n${YELLOW}Final Container Resource Usage:${NC}"
     docker stats --no-stream banyandb-stream banyandb-trace
+    
+    # Display performance summary
+    if [ -f "performance-summary.txt" ]; then
+        echo -e "\n${GREEN}Performance Summary:${NC}"
+        cat performance-summary.txt
+    fi
 }
 
 # Function to stop containers
 do_down() {
     echo -e "${YELLOW}Stopping containers...${NC}"
     cd "$SCRIPT_DIR"
+    
+    # Stop stats collection if running
+    if [ -f "collect-stats.sh" ]; then
+        ./collect-stats.sh stop 2>/dev/null || true
+    fi
+    
     docker compose down
     echo -e "${GREEN}Containers stopped.${NC}"
 }
@@ -119,6 +141,15 @@ do_down() {
 do_clean() {
     echo -e "${YELLOW}Cleaning up everything...${NC}"
     cd "$SCRIPT_DIR"
+    
+    # Stop stats collection and cleanup
+    if [ -f "collect-stats.sh" ]; then
+        ./collect-stats.sh cleanup 2>/dev/null || true
+    fi
+    
+    # Remove stats files
+    rm -f container-stats.json performance-summary.txt stats_pid
+    
     docker compose down -v
     docker compose rm -f
     echo -e "${GREEN}Cleanup complete.${NC}"
@@ -141,6 +172,17 @@ do_stats() {
     docker stats --no-stream banyandb-stream banyandb-trace
 }
 
+# Function to show performance summary
+do_summary() {
+    cd "$SCRIPT_DIR"
+    if [ -f "performance-summary.txt" ]; then
+        echo -e "${GREEN}Performance Summary from Last Test:${NC}"
+        cat performance-summary.txt
+    else
+        echo -e "${YELLOW}No performance summary found. Run a test first.${NC}"
+    fi
+}
+
 # Function to run all steps
 do_all() {
     echo -e "${GREEN}Stream vs Trace Performance Test - Complete Workflow${NC}"
@@ -206,6 +248,9 @@ case $COMMAND in
     stats)
         do_stats
         ;;
+    summary)
+        do_summary
+        ;;
     all)
         do_all
         ;;
diff --git a/test/stress/stream-vs-trace/docker/schema_client.go 
b/test/stress/stream-vs-trace/docker/schema_client.go
index ad403d2c..52b5882d 100644
--- a/test/stress/stream-vs-trace/docker/schema_client.go
+++ b/test/stress/stream-vs-trace/docker/schema_client.go
@@ -35,27 +35,27 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-// SchemaClient handles schema operations via gRPC
+// SchemaClient handles schema operations via gRPC.
 type SchemaClient struct {
-       groupClient       databasev1.GroupRegistryServiceClient
-       streamClient      databasev1.StreamRegistryServiceClient
-       traceClient       databasev1.TraceRegistryServiceClient
-       indexRuleClient   databasev1.IndexRuleRegistryServiceClient
+       groupClient        databasev1.GroupRegistryServiceClient
+       streamClient       databasev1.StreamRegistryServiceClient
+       traceClient        databasev1.TraceRegistryServiceClient
+       indexRuleClient    databasev1.IndexRuleRegistryServiceClient
        indexBindingClient databasev1.IndexRuleBindingRegistryServiceClient
 }
 
-// NewSchemaClient creates a new schema client
+// NewSchemaClient creates a new schema client.
 func NewSchemaClient(conn *grpc.ClientConn) *SchemaClient {
        return &SchemaClient{
-               groupClient:       
databasev1.NewGroupRegistryServiceClient(conn),
-               streamClient:      
databasev1.NewStreamRegistryServiceClient(conn),
-               traceClient:       
databasev1.NewTraceRegistryServiceClient(conn),
-               indexRuleClient:   
databasev1.NewIndexRuleRegistryServiceClient(conn),
+               groupClient:        
databasev1.NewGroupRegistryServiceClient(conn),
+               streamClient:       
databasev1.NewStreamRegistryServiceClient(conn),
+               traceClient:        
databasev1.NewTraceRegistryServiceClient(conn),
+               indexRuleClient:    
databasev1.NewIndexRuleRegistryServiceClient(conn),
                indexBindingClient: 
databasev1.NewIndexRuleBindingRegistryServiceClient(conn),
        }
 }
 
-// LoadStreamSchemas loads all stream-related schemas
+// LoadStreamSchemas loads all stream-related schemas.
 func (s *SchemaClient) LoadStreamSchemas(ctx context.Context) error {
        // Load stream group
        if err := s.loadStreamGroup(ctx); err != nil {
@@ -80,7 +80,7 @@ func (s *SchemaClient) LoadStreamSchemas(ctx context.Context) 
error {
        return nil
 }
 
-// LoadTraceSchemas loads all trace-related schemas
+// LoadTraceSchemas loads all trace-related schemas.
 func (s *SchemaClient) LoadTraceSchemas(ctx context.Context) error {
        // Load trace group
        if err := s.loadTraceGroup(ctx); err != nil {
@@ -369,4 +369,4 @@ func (s *SchemaClient) loadTraceIndexRuleBindings(ctx 
context.Context) error {
        }
 
        return nil
-}
\ No newline at end of file
+}
diff --git a/test/stress/stream-vs-trace/metrics.go 
b/test/stress/stream-vs-trace/metrics.go
index c35f9d7f..8abb6259 100644
--- a/test/stress/stream-vs-trace/metrics.go
+++ b/test/stress/stream-vs-trace/metrics.go
@@ -2,7 +2,8 @@
 // license agreements. See the NOTICE file distributed with
 // this work for additional information regarding copyright
 // ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the License.
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
 //     http://www.apache.org/licenses/LICENSE-2.0

Reply via email to