This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch perf/trace-vs-stream in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5e9fe68b5131c641c0226e8ed250d9b5f2452abb Author: Gao Hongtao <[email protected]> AuthorDate: Mon Sep 29 14:33:08 2025 +0000 refactor: streamline block metadata handling and improve memory management - Removed unnecessary setter methods for series ID and key range in blockMetadata, directly accessing fields instead. - Updated blockWriter to marshal metadata more efficiently during block writing. - Enhanced error handling in block reading functions to ensure offsets match expected values. - Refactored span reading and writing functions to utilize a decoder for improved performance and clarity. - Introduced new methods for merging memory parts in the SIDX interface to enhance part management. --- banyand/internal/sidx/TODO.md | 301 --------------------- banyand/internal/sidx/block.go | 18 +- banyand/internal/sidx/block_writer.go | 5 +- banyand/internal/sidx/interfaces.go | 2 + banyand/internal/sidx/introducer.go | 5 +- banyand/internal/sidx/introducer_test.go | 6 +- banyand/internal/sidx/merge.go | 50 +++- banyand/internal/sidx/metadata.go | 12 - banyand/internal/sidx/metadata_test.go | 5 +- banyand/internal/sidx/multi_sidx_query_test.go | 4 + banyand/internal/sidx/part_iter.go | 3 - banyand/internal/sidx/part_wrapper.go | 21 +- banyand/internal/sidx/part_wrapper_test.go | 2 +- banyand/internal/sidx/query_result.go | 4 + banyand/internal/sidx/sidx.go | 5 +- banyand/internal/sidx/snapshot.go | 3 + banyand/trace/block.go | 40 +-- banyand/trace/block_test.go | 3 +- banyand/trace/merger.go | 2 +- banyand/trace/snapshot.go | 1 - test/stress/stream-vs-trace/data_generator.go | 25 +- test/stress/stream-vs-trace/docker/.gitignore | 7 +- test/stress/stream-vs-trace/docker/README.md | 2 +- .../stream-vs-trace/docker/client_wrappers.go | 24 +- .../stress/stream-vs-trace/docker/collect-stats.sh | 238 ++++++++++++++++ test/stress/stream-vs-trace/docker/docker_test.go | 17 +- .../stream-vs-trace/docker/example-stats-output.md | 129 +++++++++ .../stream-vs-trace/docker/run-docker-test.sh | 51 +++- .../stress/stream-vs-trace/docker/schema_client.go | 26 +- test/stress/stream-vs-trace/metrics.go | 3 +- 30 files changed, 581 insertions(+), 433 deletions(-) diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md deleted file mode 100644 index bf4e8c6f..00000000 --- a/banyand/internal/sidx/TODO.md +++ /dev/null @@ -1,301 +0,0 @@ -# SIDX Implementation TODO List - -This document tracks the implementation progress of the Secondary Index File System (sidx) - -## Implementation Progress Overview - -**Completed Phases (33 tasks)**: ✅ -- Phase 1: Core Data Structures (6 tasks) -- Phase 2: Interface Definitions (5 tasks) -- Phase 3: Mock Implementations (4 tasks) -- Phase 4: Memory Management (4 tasks) -- Phase 5: Snapshot Management (6 tasks) -- Phase 6: Query Path (5 tasks) - All query components completed - -**Remaining Phases**: -- [ ] **Phase 7**: Flush Operations (4 tasks) -- [ ] **Phase 8**: Merge Operations (4 tasks) -- [ ] **Phase 9**: Testing (4 tasks) - -**Total Tasks**: 40 (33 completed, 7 remaining) - ---- - ---- - -## Phase 6: Query Path (Following Stream Architecture) - -### 6.1 Part Iterator (`part_iter.go` or extend `part.go`) -- [x] **Implementation Tasks**: - - [x] Create `partIter` struct for single part iteration - - [x] Implement `init(part, seriesIDs, minKey, maxKey, filter)` - - [x] Add `nextBlock() bool` method for block advancement - - [x] Create `curBlock` access and `error()` handling -- [x] **Test Cases**: - - [x] Part iteration finds all matching blocks in correct order - - [x] Key range filtering works at block level - - [x] Error handling for corrupted parts - - [x] Performance meets single-part iteration targets - -### 6.2 Multi-Part Iterator (`iter.go` - like stream's `tstIter`) -- [x] **Implementation Tasks**: - - [x] Create `iter` struct with `partIterHeap` for ordering - - [x] Implement `init(parts, seriesIDs, minKey, maxKey, filter)` - - [x] Add `nextBlock() bool` with heap-based merge logic - - [x] Create `Error()` method for aggregated error handling -- [x] **Test Cases**: - - [x] Multi-part ordering maintained across parts - - [x] Heap operations preserve key ordering (ASC/DESC) - - [x] Iterator handles empty parts gracefully - - [x] Memory usage during multi-part iteration is controlled - -### 6.3 Block Scanner (`block_scanner.go` - like stream's block_scanner) -- [x] **Implementation Tasks**: - - [x] Create `blockScanner` struct using `iter` for block access - - [x] Implement `scan(ctx, blockCh chan *blockScanResultBatch)` - - [x] Add batch processing with `blockScanResultBatch` pattern - - [x] Create element-level filtering and tag matching -- [x] **Test Cases**: - - [x] Batch processing maintains correct element ordering - - [x] Memory quota management prevents OOM - - [x] Tag filtering accuracy with bloom filters - - [x] Worker coordination and error propagation - -### 6.4 Query Result (`query_result.go` - like stream's `tsResult`) -- [x] **Implementation Tasks**: - - [x] Create `queryResult` struct implementing `QueryResult` interface - - [x] Implement `Pull() *QueryResponse` with worker pool pattern - - [x] Add `runBlockScanner(ctx)` for parallel processing - - [x] Create `Release()` for resource cleanup -- [x] **Test Cases**: - - [x] Pull/Release pattern prevents resource leaks - - [x] Parallel workers maintain result ordering - - [x] Result merging produces correct final output - - [x] Performance scales with worker count - -### 6.5 SIDX Query Interface (extend `sidx.go`) -- [x] **Implementation Tasks**: - - [x] Implement `Query(ctx context.Context, req QueryRequest) (QueryResult, error)` - - [x] Add query validation and snapshot acquisition - - [x] Create part selection by key range overlap - - [x] Integrate with existing sidx snapshot management -- [x] **Test Cases**: - - [x] Query validation catches invalid requests - - [x] Part selection optimizes I/O by filtering non-overlapping parts - - [x] Integration with snapshot reference counting works correctly - - [x] End-to-end query performance meets targets - ---- - -## Phase 7: Flush Operations - -### 7.1 Flusher Interface (`flusher.go`) -- [ ] Simple Flush() method for user control -- [ ] Internal part selection logic -- [ ] Error handling and retry mechanisms -- [ ] **Test Cases**: - - [ ] Flush API works reliably - - [ ] State management during flush operations - - [ ] Error handling for flush failures - - [ ] Concurrent flush operations are handled safely - -### 7.2 Flush to Disk (`flusher.go`) -- [ ] Create part directories with epoch names -- [ ] Write all part files atomically -- [ ] Implement crash recovery mechanisms -- [ ] **Test Cases**: - - [ ] File creation follows naming conventions - - [ ] Atomic operations prevent partial writes - - [ ] Crash recovery restores consistent state - - [ ] Disk space management during flush - -### 7.3 Tag File Writing (`flusher.go`) -- [ ] Write individual tag files (not families) -- [ ] Generate bloom filters for indexed tags -- [ ] Optimize file layout for query performance -- [ ] **Test Cases**: - - [ ] Tag file integrity after write - - [ ] Bloom filter accuracy for lookups - - [ ] File format compatibility - - [ ] Performance of tag file generation - -### 7.4 Block Serialization (`flusher.go` + `block_writer.go`) -- [ ] **Implementation Tasks**: - - [ ] Integrate block writer into flush operations - - [ ] Add primary.bin writing with block metadata - - [ ] Implement compression for block data - - [ ] Add file reference management -- [ ] **Test Cases**: - - [ ] Block persistence maintains data integrity - - [ ] Compression reduces storage footprint - - [ ] Read-back verification after flush - - [ ] Performance of block serialization - ---- - -## Phase 8: Merge Operations - -### 8.1 Merger Interface (`merger.go`) -- [ ] Simple Merge() method for user control -- [ ] Internal merge strategy implementation -- [ ] Resource management during merge -- [ ] **Test Cases**: - - [ ] Merge API responds correctly - - [ ] Error handling for merge failures - - [ ] Resource usage during merge operations - - [ ] Concurrent merge safety - -### 8.2 Part Selection (`merger.go`) -- [ ] Select parts by size/age criteria -- [ ] Avoid merging recent parts -- [ ] Optimize merge efficiency -- [ ] **Test Cases**: - - [ ] Selection algorithm chooses optimal parts - - [ ] Edge cases (no parts to merge, single part) - - [ ] Selection criteria can be tuned - - [ ] Selection performance is acceptable - -### 8.3 Merged Part Writer (`merger.go`) -- [ ] Combine parts maintaining key order -- [ ] Deduplicate overlapping data -- [ ] Generate merged part metadata -- [ ] **Test Cases**: - - [ ] Merge correctness preserves all data - - [ ] Deduplication removes redundant entries - - [ ] Key ordering is maintained across parts - - [ ] Merged part metadata is accurate - -### 8.4 Block Merging (`merger.go` + `block.go`) -- [ ] **Implementation Tasks**: - - [ ] Integrate block reader for loading blocks from parts - - [ ] Add block merging logic with ordering preservation - - [ ] Implement merged block creation - - [ ] Add memory-efficient merge processing -- [ ] **Test Cases**: - - [ ] Block merge correctness across parts - - [ ] Ordering preservation during merge - - [ ] Memory efficiency during block merge - - [ ] Performance of block merge operations - ---- - -## Phase 9: Testing - -### 9.1 Unit Tests -- [ ] **Test block.go**: Block creation, initialization, validation -- [ ] Test all components individually -- [ ] Achieve >90% code coverage -- [ ] **Test Cases**: - - [ ] Component functionality works in isolation - - [ ] Edge cases are handled correctly - - [ ] Error conditions produce expected results - - [ ] Performance characteristics meet requirements - -### 9.2 Integration Tests -- [ ] End-to-end workflow testing -- [ ] Write-flush-merge-query cycles -- [ ] Multi-component interaction verification -- [ ] **Test Cases**: - - [ ] Full workflows complete successfully - - [ ] Component interactions work correctly - - [ ] Data consistency maintained throughout - - [ ] Performance acceptable under realistic loads - -### 9.3 Performance Benchmarks -- [ ] **Benchmark block operations**: Creation, serialization, scanning -- [ ] Throughput and latency measurements -- [ ] Memory usage profiling -- [ ] **Test Cases**: - - [ ] Performance regression detection - - [ ] Throughput meets design targets - - [ ] Latency remains within bounds - - [ ] Memory usage is reasonable - -### 9.4 Concurrency Tests -- [ ] Race condition detection with race detector -- [ ] Stress testing with concurrent operations -- [ ] Deadlock prevention verification -- [ ] **Test Cases**: - - [ ] Thread safety under concurrent load - - [ ] No deadlocks in normal operation - - [ ] Data races are eliminated - - [ ] System stability under stress - ---- - -## Remaining File Creation Checklist - -### Core Implementation Files (Remaining) -- [ ] `block_reader.go` - Block deserialization 🔥 -- [ ] `block_scanner.go` - Block scanning for queries 🔥 -- [ ] `flusher.go` - Flush operations -- [ ] `merger.go` - Merge operations -- [ ] `query.go` - Query interface and execution - -### Test Files (Remaining) -- [ ] `flusher_test.go` - Flusher testing -- [ ] `merger_test.go` - Merger testing -- [ ] `writer_test.go` - Writer testing -- [ ] `query_test.go` - Query testing -- [ ] `benchmark_test.go` - Performance benchmarks -- [ ] `integration_test.go` - Integration tests -- [ ] `concurrency_test.go` - Concurrency tests - -### Completed Files ✅ -**Core Implementation**: `sidx.go`, `element.go`, `tag.go`, `part.go`, `part_wrapper.go`, `mempart.go`, `block.go`, `block_writer.go`, `snapshot.go`, `introducer.go`, `metadata.go`, `options.go` - -**Test Files**: `sidx_test.go`, `element_test.go`, `tag_test.go`, `part_test.go`, `block_test.go`, `snapshot_test.go`, `introducer_test.go` - ---- - -## Success Criteria for Remaining Phases - -### Phase Completion Requirements -- [ ] All tasks in phase completed -- [ ] Unit tests pass with >90% coverage -- [ ] Integration tests pass for phase functionality -- [ ] No race conditions detected -- [ ] Performance benchmarks meet targets -- [ ] Clean linter output (`make lint`) -- [ ] Documentation updated - -### Overall Success Criteria -- [x] Phases 1-6 completed (33/40 tasks) ✅ -- [ ] Remaining 7 tasks completed -- [ ] Full test suite passes -- [ ] Performance meets design targets -- [ ] Code review approval -- [ ] Documentation complete -- [ ] Ready for production use - ---- - -## Block.go Usage Summary 🔥 - -The `block.go` file is central to the SIDX implementation and will be used in remaining phases: - -**Completed Usage** ✅: -1. **Phase 1.4**: Block structure creation and initialization -2. **Phase 4.2**: Block writer serialization -3. **Phase 4.4**: Block initialization from elements - -**Remaining Usage**: -5. **Phase 6.3**: Block scanner reads blocks during queries -6. **Phase 6.5**: Block reader deserializes blocks -7. **Phase 7.4**: Serialize blocks to disk during flush -8. **Phase 8.4**: Merge blocks from multiple parts -9. **Phase 9.1**: Unit tests for block operations -10. **Phase 9.3**: Performance benchmarks for block operations - ---- - -## Dependencies Between Remaining Tasks - -**Completed Foundation** ✅: -- Phases 1-5 provide all core data structures, interfaces, memory management, and snapshot management - -**Remaining Dependencies**: -- **Phase 6** can be developed independently (queries work with existing persisted data) -- **Phase 7** requires completed snapshot management from Phase 5 ✅ -- **Phase 8** requires Phase 7 completion (flush needed for merge) -- **Phase 9** requires completion of relevant phases for testing diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go index 55dd22f3..bfd592b3 100644 --- a/banyand/internal/sidx/block.go +++ b/banyand/internal/sidx/block.go @@ -245,6 +245,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers) // Write user keys to keys.bin and capture encoding information bm.keysEncodeType, bm.minKey = mustWriteKeysTo(&bm.keysBlock, b.userKeys, &ww.keysWriter) + bm.maxKey = b.userKeys[len(b.userKeys)-1] // Write data payloads to data.bin mustWriteDataTo(&bm.dataBlock, b.data, &ww.dataWriter) @@ -516,6 +517,9 @@ func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, sr *seqRead } func (b *block) readUserKeys(sr *seqReaders, bm *blockMetadata) error { + if bm.keysBlock.offset != sr.keys.bytesRead { + logger.Panicf("offset %d must be equal to bytesRead %d", bm.keysBlock.offset, sr.keys.bytesRead) + } bb := bigValuePool.Get() if bb == nil { bb = &bytes.Buffer{} @@ -524,7 +528,7 @@ func (b *block) readUserKeys(sr *seqReaders, bm *blockMetadata) error { bb.Buf = bb.Buf[:0] bigValuePool.Put(bb) }() - bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(bm.keysBlock.size)) + bb.Buf = bytes.ResizeExact(bb.Buf[:0], int(bm.keysBlock.size)) sr.keys.mustReadFull(bb.Buf) var err error b.userKeys, err = encoding.BytesToInt64List(b.userKeys[:0], bb.Buf, bm.keysEncodeType, bm.minKey, int(bm.count)) @@ -535,6 +539,9 @@ func (b *block) readUserKeys(sr *seqReaders, bm *blockMetadata) error { } func (b *block) readData(decoder *encoding.BytesBlockDecoder, sr *seqReaders, bm *blockMetadata) error { + if bm.dataBlock.offset != sr.data.bytesRead { + logger.Panicf("offset %d must be equal to bytesRead %d", bm.dataBlock.offset, sr.data.bytesRead) + } bb := bigValuePool.Get() if bb == nil { bb = &bytes.Buffer{} @@ -543,7 +550,7 @@ func (b *block) readData(decoder *encoding.BytesBlockDecoder, sr *seqReaders, bm bb.Buf = bb.Buf[:0] bigValuePool.Put(bb) }() - bb.Buf = bytes.ResizeOver(bb.Buf, int(bm.dataBlock.size)) + bb.Buf = bytes.ResizeExact(bb.Buf, int(bm.dataBlock.size)) sr.data.mustReadFull(bb.Buf) var err error @@ -567,6 +574,9 @@ func (b *block) readTagData(decoder *encoding.BytesBlockDecoder, sr *seqReaders, } func (b *block) readSingleTag(decoder *encoding.BytesBlockDecoder, sr *seqReaders, tagName string, tagBlock *dataBlock, count int) error { + if tagBlock.offset != sr.tagMetadata[tagName].bytesRead { + logger.Panicf("offset %d must be equal to bytesRead %d", tagBlock.offset, sr.tagMetadata[tagName].bytesRead) + } tmReader, tmExists := sr.tagMetadata[tagName] if !tmExists { return fmt.Errorf("tag metadata reader not found for tag %s", tagName) @@ -585,7 +595,7 @@ func (b *block) readSingleTag(decoder *encoding.BytesBlockDecoder, sr *seqReader bigValuePool.Put(bb) }() - bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(tagBlock.size)) + bb.Buf = bytes.ResizeExact(bb.Buf[:0], int(tagBlock.size)) tmReader.mustReadFull(bb.Buf) tm, err := unmarshalTagMetadata(bb.Buf) if err != nil { @@ -593,7 +603,7 @@ func (b *block) readSingleTag(decoder *encoding.BytesBlockDecoder, sr *seqReader } defer releaseTagMetadata(tm) - bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(tm.dataBlock.size)) + bb.Buf = bytes.ResizeExact(bb.Buf[:0], int(tm.dataBlock.size)) tdReader.mustReadFull(bb.Buf) td := generateTagData() td.name = tagName diff --git a/banyand/internal/sidx/block_writer.go b/banyand/internal/sidx/block_writer.go index 88aea021..75645b4f 100644 --- a/banyand/internal/sidx/block_writer.go +++ b/banyand/internal/sidx/block_writer.go @@ -329,8 +329,6 @@ func (bw *blockWriter) mustWriteBlock(sid common.SeriesID, b *block) { bw.totalBlocksCount++ // Serialize block metadata - bm.setSeriesID(sid) - bm.setKeyRange(minKey, maxKey) bw.primaryBlockData = bm.marshal(bw.primaryBlockData) if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize { @@ -343,8 +341,7 @@ func (bw *blockWriter) mustWriteBlock(sid common.SeriesID, b *block) { func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) { if len(data) > 0 { bw.primaryBlockMetadata.mustWriteBlock(data, bw.sidFirst, bw.minKey, bw.maxKey, &bw.writers) - bmData := bw.primaryBlockMetadata.marshal(bw.metaData[:0]) - bw.metaData = append(bw.metaData, bmData...) + bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData) } bw.hasWrittenBlocks = false bw.minKey = 0 diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index 8b61c802..411c9e2a 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -53,6 +53,8 @@ type SIDX interface { Flush() error // Merge merges the specified parts into a new part. Merge(closeCh <-chan struct{}) error + // MergeMemPart merges the mem parts into a new part. + MergeMemParts(closeCh <-chan struct{}) error // PartsToSync returns the parts to sync. PartsToSync() []*part // StreamingParts returns the streaming parts. diff --git a/banyand/internal/sidx/introducer.go b/banyand/internal/sidx/introducer.go index 34e6a8ed..c87bd163 100644 --- a/banyand/internal/sidx/introducer.go +++ b/banyand/internal/sidx/introducer.go @@ -80,7 +80,7 @@ func releaseFlusherIntroduction(i *flusherIntroduction) { type mergerIntroduction struct { merged map[uint64]struct{} - newPart *part + newPart *partWrapper applied chan struct{} } @@ -229,8 +229,7 @@ func (s *sidx) introduceMerged(nextIntroduction *mergerIntroduction, epoch uint6 nextSnp := cur.remove(epoch, nextIntroduction.merged) // Wrap the new part - pw := newPartWrapper(nil, nextIntroduction.newPart) - nextSnp.parts = append(nextSnp.parts, pw) + nextSnp.parts = append(nextSnp.parts, nextIntroduction.newPart) s.replaceSnapshot(nextSnp) s.persistSnapshot(nextSnp) diff --git a/banyand/internal/sidx/introducer_test.go b/banyand/internal/sidx/introducer_test.go index f601e995..a8651496 100644 --- a/banyand/internal/sidx/introducer_test.go +++ b/banyand/internal/sidx/introducer_test.go @@ -88,7 +88,7 @@ func TestIntroductionPooling(t *testing.T) { for i := 0; i < 10; i++ { intro := generateMergerIntroduction() intro.merged[uint64(i)] = struct{}{} - intro.newPart = &part{} + intro.newPart = &partWrapper{} intro.applied = make(chan struct{}) intros = append(intros, intro) } @@ -141,7 +141,7 @@ func TestIntroductionReset(t *testing.T) { // Set up merger introduction with data intro.merged[1] = struct{}{} intro.merged[2] = struct{}{} - intro.newPart = &part{} + intro.newPart = &partWrapper{} intro.applied = make(chan struct{}) // Reset the merger introduction @@ -331,7 +331,7 @@ func TestConcurrentPoolAccess(t *testing.T) { for j := 0; j < operationsPerGoroutine; j++ { intro := generateMergerIntroduction() intro.merged[uint64(j)] = struct{}{} - intro.newPart = &part{} + intro.newPart = &partWrapper{} intro.applied = make(chan struct{}) releaseMergerIntroduction(intro) } diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go index 4f348ae5..9acbdcd3 100644 --- a/banyand/internal/sidx/merge.go +++ b/banyand/internal/sidx/merge.go @@ -69,7 +69,55 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error { if err != nil { return err } - mergeIntro.newPart = newPart.p + mergeIntro.newPart = newPart + + // Send to introducer loop + s.mergeCh <- mergeIntro + + // Wait for merge to complete + <-mergeIntro.applied + + return nil +} + +func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error { + snap := s.currentSnapshot() + if snap == nil { + return nil + } + defer snap.decRef() + + // Create merge introduction + mergeIntro := generateMergerIntroduction() + defer releaseMergerIntroduction(mergeIntro) + mergeIntro.applied = make(chan struct{}) + + // Select parts to merge (all active non-memory parts) + var partsToMerge []*partWrapper + for _, pw := range snap.parts { + if pw.isActive() && pw.isMemPart() { + partsToMerge = append(partsToMerge, pw) + } + } + + if len(partsToMerge) < 2 { + return nil + } + + // Mark parts for merging + for _, pw := range partsToMerge { + mergeIntro.merged[pw.ID()] = struct{}{} + } + + // Generate new part ID using atomic increment + newPartID := atomic.AddUint64(&s.curPartID, 1) + + // Create new merged part + newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root) + if err != nil { + return err + } + mergeIntro.newPart = newPart // Send to introducer loop s.mergeCh <- mergeIntro diff --git a/banyand/internal/sidx/metadata.go b/banyand/internal/sidx/metadata.go index b60197ae..ba4a6f5f 100644 --- a/banyand/internal/sidx/metadata.go +++ b/banyand/internal/sidx/metadata.go @@ -402,18 +402,6 @@ func (bm *blockMetadata) TagsBlocks() map[string]dataBlock { return bm.tagsBlocks } -// setSeriesID sets the seriesID of the block. -func (bm *blockMetadata) setSeriesID(seriesID common.SeriesID) { - bm.seriesID = seriesID -} - -// setKeyRange sets the key range of the block. -func (bm *blockMetadata) setKeyRange(minKey, maxKey int64) { - bm.minKey = minKey - bm.maxKey = maxKey -} - -// setDataBlock sets the data block reference. func (bm *blockMetadata) setDataBlock(offset, size uint64) { bm.dataBlock = dataBlock{offset: offset, size: size} } diff --git a/banyand/internal/sidx/metadata_test.go b/banyand/internal/sidx/metadata_test.go index 3d25ef96..248bec42 100644 --- a/banyand/internal/sidx/metadata_test.go +++ b/banyand/internal/sidx/metadata_test.go @@ -587,10 +587,11 @@ func TestBlockMetadata_SetterMethods(t *testing.T) { defer releaseBlockMetadata(bm) // Test setter methods - bm.setSeriesID(common.SeriesID(456)) + bm.seriesID = common.SeriesID(456) assert.Equal(t, common.SeriesID(456), bm.seriesID) - bm.setKeyRange(20, 200) + bm.minKey = 20 + bm.maxKey = 200 assert.Equal(t, int64(20), bm.minKey) assert.Equal(t, int64(200), bm.maxKey) diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go index 155c06bf..3249a50a 100644 --- a/banyand/internal/sidx/multi_sidx_query_test.go +++ b/banyand/internal/sidx/multi_sidx_query_test.go @@ -66,6 +66,10 @@ func (m *mockSIDX) Merge(_ <-chan struct{}) error { return nil } +func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) error { + return nil +} + func (m *mockSIDX) PartsToSync() []*part { return nil } diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go index 7b095848..d49e8611 100644 --- a/banyand/internal/sidx/part_iter.go +++ b/banyand/internal/sidx/part_iter.go @@ -268,7 +268,6 @@ type partMergeIter struct { compressedPrimaryBuf []byte primaryBuf []byte block blockPointer - partID uint64 primaryMetadataIdx int } @@ -277,7 +276,6 @@ func (pmi *partMergeIter) reset() { pmi.seqReaders.reset() pmi.primaryBlockMetadata = nil pmi.primaryMetadataIdx = 0 - pmi.partID = 0 pmi.primaryBuf = pmi.primaryBuf[:0] pmi.compressedPrimaryBuf = pmi.compressedPrimaryBuf[:0] pmi.block.reset() @@ -287,7 +285,6 @@ func (pmi *partMergeIter) mustInitFromPart(p *part) { pmi.reset() pmi.seqReaders.init(p) pmi.primaryBlockMetadata = p.primaryBlockMetadata - pmi.partID = p.partMetadata.ID } func (pmi *partMergeIter) error() error { diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go index 0ca41099..37199f07 100644 --- a/banyand/internal/sidx/part_wrapper.go +++ b/banyand/internal/sidx/part_wrapper.go @@ -54,22 +54,11 @@ func (s partWrapperState) String() string { // It enables safe concurrent access to parts while managing their lifecycle. // When the reference count reaches zero, the underlying part is cleaned up. type partWrapper struct { - // p is the underlying part. It can be nil for memory parts. - p *part - - // mp is the memory part. It can be nil for file-based parts. - mp *memPart - - // ref is the atomic reference counter. - // It starts at 1 when the wrapper is created. - ref int32 - - // state tracks the lifecycle state of the part. - // State transitions: active -> removing -> removed - state int32 - - // removable indicates if the part should be removed from disk when dereferenced. - // This is typically true for parts that have been merged or are no longer needed. + p *part + mp *memPart + snapshot []uint64 + ref int32 + state int32 removable atomic.Bool } diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go index 8951ee7c..4ca6af43 100644 --- a/banyand/internal/sidx/part_wrapper_test.go +++ b/banyand/internal/sidx/part_wrapper_test.go @@ -206,7 +206,7 @@ func TestPartWrapper_NilPart(t *testing.T) { require.NotNil(t, pw) assert.Equal(t, int32(1), pw.refCount()) - assert.Equal(t, uint64(0), pw.ID()) // Should return 0 for nil part + assert.Nil(t, pw.p) assert.True(t, pw.isActive()) // Test acquire/release with nil part diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index 09692a2f..290c8938 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -70,6 +70,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata var err error tmpBlock.userKeys, err = encoding.BytesToInt64List(tmpBlock.userKeys[:0], bb.Buf, bm.keysEncodeType, bm.minKey, int(bm.count)) if err != nil { + logger.Panicf("cannot decode user keys: %v", err) return false } @@ -90,6 +91,7 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata decoder := &encoding.BytesBlockDecoder{} tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], bb2.Buf, bm.count) if err != nil { + logger.Panicf("cannot decode data payloads: %v", err) return false } @@ -156,6 +158,7 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tag tm, err := unmarshalTagMetadata(bb.Buf) if err != nil { + logger.Panicf("cannot unmarshal tag metadata: %v", err) return false } defer releaseTagMetadata(tm) @@ -178,6 +181,7 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tag // Decode tag values directly (no compression) td.values, err = internalencoding.DecodeTagValues(td.values[:0], decoder, bb2, tm.valueType, count) if err != nil { + logger.Panicf("cannot decode tag values: %v", err) return false } diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index 352df8f3..14f3b8ec 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -453,8 +453,9 @@ func (s *sidx) Flush() error { } partPath := partPath(s.root, pw.ID()) pw.mp.mustFlush(s.fileSystem, partPath) - p := mustOpenPart(partPath, s.fileSystem) - flushIntro.flushed[p.partMetadata.ID] = pw + newPW := newPartWrapper(nil, mustOpenPart(partPath, s.fileSystem)) + newPW.p.partMetadata.ID = pw.ID() + flushIntro.flushed[newPW.ID()] = newPW } if len(flushIntro.flushed) == 0 { diff --git a/banyand/internal/sidx/snapshot.go b/banyand/internal/sidx/snapshot.go index e99182d6..5fdbf51e 100644 --- a/banyand/internal/sidx/snapshot.go +++ b/banyand/internal/sidx/snapshot.go @@ -277,6 +277,7 @@ func (s *snapshot) copyAllTo(epoch uint64) *snapshot { for _, pw := range result.parts { if pw != nil { pw.acquire() + pw.snapshot = append(pw.snapshot, epoch) } } @@ -291,10 +292,12 @@ func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) *s for i := 0; i < len(s.parts); i++ { if n, ok := nextParts[s.parts[i].ID()]; ok { result.parts = append(result.parts, n) + n.snapshot = append(n.snapshot, nextEpoch) continue } if s.parts[i].acquire() { result.parts = append(result.parts, s.parts[i]) + s.parts[i].snapshot = append(s.parts[i].snapshot, nextEpoch) } } return &result diff --git a/banyand/trace/block.go b/banyand/trace/block.go index 9ba12bee..22a47f80 100644 --- a/banyand/trace/block.go +++ b/banyand/trace/block.go @@ -223,7 +223,7 @@ func (b *block) spanSize() uint64 { func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm blockMetadata) { b.reset() - b.spans = mustReadSpansFrom(b.spans, bm.spans, int(bm.count), p.spans) + b.spans = mustReadSpansFrom(decoder, b.spans, bm.spans, int(bm.count), p.spans) b.resizeTags(len(bm.tagProjection.Names)) for i, name := range bm.tagProjection.Names { @@ -244,7 +244,7 @@ func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm bl func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, seqReaders *seqReaders, bm blockMetadata) { b.reset() - b.spans = mustSeqReadSpansFrom(b.spans, bm.spans, int(bm.count), &seqReaders.spans) + b.spans = mustSeqReadSpansFrom(decoder, b.spans, bm.spans, int(bm.count), &seqReaders.spans) b.resizeTags(len(bm.tags)) keys := make([]string, 0, len(bm.tags)) @@ -275,36 +275,26 @@ func mustWriteSpansTo(sm *dataBlock, spans [][]byte, spanWriter *writer) { defer bigValuePool.Release(bb) sm.offset = spanWriter.bytesWritten - for _, span := range spans { - bb.Buf = encoding.VarUint64ToBytes(bb.Buf, uint64(len(span))) - bb.Buf = append(bb.Buf, span...) - } + bb.Buf = encoding.EncodeBytesBlock(bb.Buf, spans) sm.size = uint64(len(bb.Buf)) spanWriter.MustWrite(bb.Buf) } -func mustReadSpansFrom(spans [][]byte, sm *dataBlock, count int, reader fs.Reader) [][]byte { +func mustReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte, sm *dataBlock, count int, reader fs.Reader) [][]byte { bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(sm.size)) fs.MustReadData(reader, int64(sm.offset), bb.Buf) - - src := bb.Buf spans = resizeSpans(spans, count) - var spanLen uint64 - for i := 0; i < count; i++ { - src, spanLen = encoding.BytesToVarUint64(src) - if uint64(len(src)) < spanLen { - logger.Panicf("insufficient data for span: need %d bytes, have %d", spanLen, len(src)) - } - spans[i] = append(spans[i], src[:spanLen]...) - src = src[spanLen:] + spans, err := decoder.Decode(spans[:0], bb.Buf, uint64(count)) + if err != nil { + logger.Panicf("cannot decode spans: %v", err) } return spans } -func mustSeqReadSpansFrom(spans [][]byte, sm *dataBlock, count int, reader *seqReader) [][]byte { +func mustSeqReadSpansFrom(decoder *encoding.BytesBlockDecoder, spans [][]byte, sm *dataBlock, count int, reader *seqReader) [][]byte { if sm.offset != reader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", sm.offset, reader.bytesRead) } @@ -312,17 +302,9 @@ func mustSeqReadSpansFrom(spans [][]byte, sm *dataBlock, count int, reader *seqR defer bigValuePool.Release(bb) bb.Buf = pkgbytes.ResizeExact(bb.Buf, int(sm.size)) reader.mustReadFull(bb.Buf) - - src := bb.Buf - spans = resizeSpans(spans, count) - var spanLen uint64 - for i := 0; i < count; i++ { - src, spanLen = encoding.BytesToVarUint64(src) - if uint64(len(src)) < spanLen { - logger.Panicf("insufficient data for span: need %d bytes, have %d", spanLen, len(src)) - } - spans[i] = append(spans[i], src[:spanLen]...) - src = src[spanLen:] + spans, err := decoder.Decode(spans[:0], bb.Buf, uint64(count)) + if err != nil { + logger.Panicf("cannot decode spans: %v", err) } return spans } diff --git a/banyand/trace/block_test.go b/banyand/trace/block_test.go index 398bc36f..7984a44e 100644 --- a/banyand/trace/block_test.go +++ b/banyand/trace/block_test.go @@ -225,6 +225,7 @@ func Test_mustWriteAndReadSpans(t *testing.T) { spans: [][]byte{[]byte("span1"), []byte("span2"), []byte("span3")}, }, } + decoder := &encoding.BytesBlockDecoder{} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer func() { @@ -238,7 +239,7 @@ func Test_mustWriteAndReadSpans(t *testing.T) { w := new(writer) w.init(b) mustWriteSpansTo(sm, tt.spans, w) - spans := mustReadSpansFrom(nil, sm, len(tt.spans), b) + spans := mustReadSpansFrom(decoder, nil, sm, len(tt.spans), b) if !reflect.DeepEqual(spans, tt.spans) { t.Errorf("mustReadSpansFrom() spans = %v, want %v", spans, tt.spans) } diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index a19400b8..148aa6fa 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -113,7 +113,7 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, part return nil, err } for _, sidxInstance := range tst.getAllSidx() { - if err := sidxInstance.Merge(closeCh); err != nil { + if err := sidxInstance.MergeMemParts(closeCh); err != nil { tst.l.Warn().Err(err).Msg("sidx merge failed") return nil, err } diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go index 6f873ffe..2019335a 100644 --- a/banyand/trace/snapshot.go +++ b/banyand/trace/snapshot.go @@ -128,7 +128,6 @@ func (s *snapshot) remove(nextEpoch uint64, merged map[partHandle]struct{}) snap s.parts[i].removable.Store(true) removedCount++ } - logger.Infof("removed %d parts, merged %d parts", removedCount, len(merged)) return result } diff --git a/test/stress/stream-vs-trace/data_generator.go b/test/stress/stream-vs-trace/data_generator.go index 09081e5d..6a39cd86 100644 --- a/test/stress/stream-vs-trace/data_generator.go +++ b/test/stress/stream-vs-trace/data_generator.go @@ -20,7 +20,9 @@ package streamvstrace import ( "context" "crypto/rand" + "errors" "fmt" + "io" "math" "math/big" "strings" @@ -744,11 +746,15 @@ func (c *StreamClient) WriteStreamData(ctx context.Context, spans []*SpanData) e } // Wait for final response - _, err = stream.Recv() - if err != nil && err.Error() != "EOF" { - return fmt.Errorf("failed to receive final response: %w", err) + for i := 0; i < len(spans); i++ { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return fmt.Errorf("failed to receive final response: %w", err) + } } - return nil } @@ -774,9 +780,14 @@ func (c *TraceClient) WriteTraceData(ctx context.Context, spans []*SpanData) err } // Wait for final response - _, err = stream.Recv() - if err != nil && err.Error() != "EOF" { - return fmt.Errorf("failed to receive final response: %w", err) + for i := 0; i < len(spans); i++ { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return fmt.Errorf("failed to receive final response: %w", err) + } } return nil diff --git a/test/stress/stream-vs-trace/docker/.gitignore b/test/stress/stream-vs-trace/docker/.gitignore index fadb27cc..d21a26c8 100644 --- a/test/stress/stream-vs-trace/docker/.gitignore +++ b/test/stress/stream-vs-trace/docker/.gitignore @@ -11,4 +11,9 @@ trace-data/ coverage.html # Binary files -*.test \ No newline at end of file +*.test + +# Stats files +container-stats.json +performance-summary.txt +stats_pid diff --git a/test/stress/stream-vs-trace/docker/README.md b/test/stress/stream-vs-trace/docker/README.md index 3e528ecb..049ebd9e 100644 --- a/test/stress/stream-vs-trace/docker/README.md +++ b/test/stress/stream-vs-trace/docker/README.md @@ -54,7 +54,7 @@ Each container is configured with: ## Prerequisites - Docker and Docker Compose -- Go 1.23 or later +- Go 1.25 or later - GNU Make ## Usage diff --git a/test/stress/stream-vs-trace/docker/client_wrappers.go b/test/stress/stream-vs-trace/docker/client_wrappers.go index 8e786c49..18f6e4a9 100644 --- a/test/stress/stream-vs-trace/docker/client_wrappers.go +++ b/test/stress/stream-vs-trace/docker/client_wrappers.go @@ -19,34 +19,34 @@ package docker import ( "google.golang.org/grpc" - + streamvstrace "github.com/apache/skywalking-banyandb/test/stress/stream-vs-trace" ) -// DockerStreamClient wraps StreamClient with connection exposed -type DockerStreamClient struct { +// StreamClient wraps StreamClient with connection exposed. +type StreamClient struct { *streamvstrace.StreamClient conn *grpc.ClientConn } -// NewDockerStreamClient creates a new DockerStreamClient instance -func NewDockerStreamClient(conn *grpc.ClientConn) *DockerStreamClient { - return &DockerStreamClient{ +// NewStreamClient creates a new DockerStreamClient instance. +func NewStreamClient(conn *grpc.ClientConn) *StreamClient { + return &StreamClient{ StreamClient: streamvstrace.NewStreamClient(conn), conn: conn, } } -// DockerTraceClient wraps TraceClient with connection exposed -type DockerTraceClient struct { +// TraceClient wraps TraceClient with connection exposed. +type TraceClient struct { *streamvstrace.TraceClient conn *grpc.ClientConn } -// NewDockerTraceClient creates a new DockerTraceClient instance -func NewDockerTraceClient(conn *grpc.ClientConn) *DockerTraceClient { - return &DockerTraceClient{ +// NewTraceClient creates a new DockerTraceClient instance. +func NewTraceClient(conn *grpc.ClientConn) *TraceClient { + return &TraceClient{ TraceClient: streamvstrace.NewTraceClient(conn), conn: conn, } -} \ No newline at end of file +} diff --git a/test/stress/stream-vs-trace/docker/collect-stats.sh b/test/stress/stream-vs-trace/docker/collect-stats.sh new file mode 100755 index 00000000..018caec2 --- /dev/null +++ b/test/stress/stream-vs-trace/docker/collect-stats.sh @@ -0,0 +1,238 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file to +# you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Container stats collection script +# Collects performance metrics during test execution + +set -e + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +STATS_FILE="$SCRIPT_DIR/container-stats.json" +SUMMARY_FILE="$SCRIPT_DIR/performance-summary.txt" + +# Colors for output +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Container names +STREAM_CONTAINER="banyandb-stream" +TRACE_CONTAINER="banyandb-trace" + +# Function to check if containers exist +check_containers() { + if ! docker ps --format "table {{.Names}}" | grep -q "$STREAM_CONTAINER"; then + echo -e "${YELLOW}Warning: Stream container ($STREAM_CONTAINER) not found${NC}" + return 1 + fi + + if ! docker ps --format "table {{.Names}}" | grep -q "$TRACE_CONTAINER"; then + echo -e "${YELLOW}Warning: Trace container ($TRACE_CONTAINER) not found${NC}" + return 1 + fi + + return 0 +} + +# Function to collect stats for a single container +collect_container_stats() { + local container_name=$1 + local timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ") + + # Get container stats in JSON format + local stats=$(docker stats --no-stream --format "json" "$container_name" 2>/dev/null || echo "{}") + + # Add timestamp and container name + echo "$stats" | jq --arg timestamp "$timestamp" --arg container "$container_name" \ + '. + {timestamp: $timestamp, container: $container}' 2>/dev/null || echo "{}" +} + +# Function to start continuous stats collection +start_stats_collection() { + echo -e "${GREEN}Starting container stats collection...${NC}" + + # Initialize stats file + echo "[]" > "$STATS_FILE" + + # Check if containers exist + if ! check_containers; then + echo -e "${YELLOW}Some containers not found, stats collection may be incomplete${NC}" + fi + + # Start background collection process + ( + while true; do + # Collect stats for both containers + local stream_stats=$(collect_container_stats "$STREAM_CONTAINER") + local trace_stats=$(collect_container_stats "$TRACE_CONTAINER") + + # Add to stats file + if [ "$stream_stats" != "{}" ] || [ "$trace_stats" != "{}" ]; then + local temp_file=$(mktemp) + jq --argjson stream "$stream_stats" --argjson trace "$trace_stats" \ + '. + [$stream, $trace]' "$STATS_FILE" > "$temp_file" 2>/dev/null || echo "[]" > "$temp_file" + mv "$temp_file" "$STATS_FILE" + fi + + sleep 5 # Collect every 5 seconds + done + ) & + + echo $! > "$SCRIPT_DIR/stats_pid" + echo -e "${GREEN}Stats collection started (PID: $(cat $SCRIPT_DIR/stats_pid))${NC}" +} + +# Function to stop stats collection +stop_stats_collection() { + if [ -f "$SCRIPT_DIR/stats_pid" ]; then + local pid=$(cat "$SCRIPT_DIR/stats_pid") + if kill -0 "$pid" 2>/dev/null; then + kill "$pid" + echo -e "${GREEN}Stats collection stopped${NC}" + fi + rm -f "$SCRIPT_DIR/stats_pid" + fi +} + +# Function to generate performance summary +generate_summary() { + echo -e "${GREEN}Generating performance summary...${NC}" + + if [ ! -f "$STATS_FILE" ] || [ ! -s "$STATS_FILE" ]; then + echo -e "${YELLOW}No stats data found${NC}" + return 1 + fi + + # Create summary file + cat > "$SUMMARY_FILE" << EOF +======================================== +Container Performance Summary +======================================== +Generated: $(date) +Test Duration: $(jq -r '.[0].timestamp + " to " + .[-1].timestamp' "$STATS_FILE" 2>/dev/null || echo "Unknown") + +EOF + + # Process stats for each container + for container in "$STREAM_CONTAINER" "$TRACE_CONTAINER"; do + echo "Processing stats for $container..." + + # Filter stats for this container + local container_stats=$(jq --arg container "$container" '.[] | select(.container == $container)' "$STATS_FILE") + + if [ -z "$container_stats" ] || [ "$container_stats" = "" ]; then + echo -e "${YELLOW}No stats found for $container${NC}" + continue + fi + + # Calculate metrics + local cpu_avg=$(echo "$container_stats" | jq -r '.CPUPerc' | sed 's/%//' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}') + local cpu_max=$(echo "$container_stats" | jq -r '.CPUPerc' | sed 's/%//' | awk 'BEGIN{max=0} {if($1>max) max=$1} END {print max}') + local mem_avg=$(echo "$container_stats" | jq -r '.MemUsage' | awk -F'/' '{print $1}' | sed 's/[^0-9.]//g' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}') + local mem_max=$(echo "$container_stats" | jq -r '.MemUsage' | awk -F'/' '{print $1}' | sed 's/[^0-9.]//g' | awk 'BEGIN{max=0} {if($1>max) max=$1} END {print max}') + local mem_limit=$(echo "$container_stats" | jq -r '.MemUsage' | awk -F'/' '{print $2}' | head -1 | sed 's/[^0-9.]//g') + local net_rx=$(echo "$container_stats" | jq -r '.NetIO' | awk -F'/' '{print $1}' | head -1) + local net_tx=$(echo "$container_stats" | jq -r '.NetIO' | awk -F'/' '{print $2}' | head -1) + local block_read=$(echo "$container_stats" | jq -r '.BlockIO' | awk -F'/' '{print $1}' | head -1) + local block_write=$(echo "$container_stats" | jq -r '.BlockIO' | awk -F'/' '{print $2}' | head -1) + local sample_count=$(echo "$container_stats" | jq -s 'length') + + # Add to summary + cat >> "$SUMMARY_FILE" << EOF + +---------------------------------------- +$container Performance Metrics +---------------------------------------- +Sample Count: $sample_count + +CPU Usage: + Average: ${cpu_avg}% + Maximum: ${cpu_max}% + +Memory Usage: + Average: ${mem_avg}MB + Maximum: ${mem_max}MB + Limit: ${mem_limit}MB + Usage %: $(echo "scale=2; $mem_avg * 100 / $mem_limit" | bc 2>/dev/null || echo "N/A")% + +Network I/O: + Received: $net_rx + Transmitted: $net_tx + +Block I/O: + Read: $block_read + Write: $block_write + +EOF + done + + # Add comparison section + cat >> "$SUMMARY_FILE" << EOF + +---------------------------------------- +Performance Comparison +---------------------------------------- +EOF + + # Compare CPU usage + local stream_cpu_avg=$(jq --arg container "$STREAM_CONTAINER" '.[] | select(.container == $container) | .CPUPerc' "$STATS_FILE" | sed 's/%//' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}') + local trace_cpu_avg=$(jq --arg container "$TRACE_CONTAINER" '.[] | select(.container == $container) | .CPUPerc' "$STATS_FILE" | sed 's/%//' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}') + + if [ "$stream_cpu_avg" != "0" ] && [ "$trace_cpu_avg" != "0" ]; then + local cpu_ratio=$(echo "scale=2; $stream_cpu_avg / $trace_cpu_avg" | bc 2>/dev/null || echo "N/A") + echo "CPU Usage Ratio (Stream/Trace): $cpu_ratio" >> "$SUMMARY_FILE" + fi + + # Compare Memory usage + local stream_mem_avg=$(jq --arg container "$STREAM_CONTAINER" '.[] | select(.container == $container) | .MemUsage' "$STATS_FILE" | awk -F'/' '{print $1}' | sed 's/[^0-9.]//g' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}') + local trace_mem_avg=$(jq --arg container "$TRACE_CONTAINER" '.[] | select(.container == $container) | .MemUsage' "$STATS_FILE" | awk -F'/' '{print $1}' | sed 's/[^0-9.]//g' | awk '{sum+=$1; count++} END {if(count>0) print sum/count; else print 0}') + + if [ "$stream_mem_avg" != "0" ] && [ "$trace_mem_avg" != "0" ]; then + local mem_ratio=$(echo "scale=2; $stream_mem_avg / $trace_mem_avg" | bc 2>/dev/null || echo "N/A") + echo "Memory Usage Ratio (Stream/Trace): $mem_ratio" >> "$SUMMARY_FILE" + fi + + echo -e "${GREEN}Performance summary saved to: $SUMMARY_FILE${NC}" + echo -e "${GREEN}Raw stats data saved to: $STATS_FILE${NC}" +} + +# Function to cleanup +cleanup() { + stop_stats_collection + rm -f "$SCRIPT_DIR/stats_pid" +} + +# Main execution +case "${1:-start}" in + start) + start_stats_collection + ;; + stop) + stop_stats_collection + ;; + summary) + generate_summary + ;; + cleanup) + cleanup + ;; + *) + echo "Usage: $0 {start|stop|summary|cleanup}" + exit 1 + ;; +esac diff --git a/test/stress/stream-vs-trace/docker/docker_test.go b/test/stress/stream-vs-trace/docker/docker_test.go index f5a170ce..80ba6c1b 100644 --- a/test/stress/stream-vs-trace/docker/docker_test.go +++ b/test/stress/stream-vs-trace/docker/docker_test.go @@ -21,7 +21,6 @@ package docker import ( "context" "fmt" - "os" "testing" "time" @@ -38,9 +37,6 @@ import ( ) func TestStreamVsTraceDocker(t *testing.T) { - if os.Getenv("DOCKER_TEST") != "true" { - t.Skip("Skipping Docker test. Set DOCKER_TEST=true to run.") - } gomega.RegisterFailHandler(g.Fail) g.RunSpecs(t, "Stream vs Trace Performance Docker Suite", g.Label("docker", "performance", "slow")) } @@ -55,8 +51,8 @@ var _ = g.Describe("Stream vs Trace Performance Docker", func() { g.It("should run performance comparison using Docker containers", func() { // Define connection addresses for the two containers - streamAddr := "localhost:17912" // Stream container gRPC port - traceAddr := "localhost:27912" // Trace container gRPC port + streamAddr := "localhost:17912" // Stream container gRPC port + traceAddr := "localhost:27912" // Trace container gRPC port // Wait for both containers to be ready g.By("Waiting for Stream container to be ready") @@ -77,8 +73,8 @@ var _ = g.Describe("Stream vs Trace Performance Docker", func() { defer traceConn.Close() // Create Docker clients with exposed connections - streamClient := NewDockerStreamClient(streamConn) - traceClient := NewDockerTraceClient(traceConn) + streamClient := NewStreamClient(streamConn) + traceClient := NewTraceClient(traceConn) // Create context for operations ctx := context.Background() @@ -128,8 +124,7 @@ var _ = g.Describe("Stream vs Trace Performance Docker", func() { }) }) -// loadSchemasToContainer loads the required schemas into both containers -func loadSchemasToContainer(ctx context.Context, streamClient *DockerStreamClient, traceClient *DockerTraceClient) error { +func loadSchemasToContainer(ctx context.Context, streamClient *StreamClient, traceClient *TraceClient) error { // Create schema clients for both connections streamSchemaClient := NewSchemaClient(streamClient.conn) traceSchemaClient := NewSchemaClient(traceClient.conn) @@ -145,4 +140,4 @@ func loadSchemasToContainer(ctx context.Context, streamClient *DockerStreamClien } return nil -} \ No newline at end of file +} diff --git a/test/stress/stream-vs-trace/docker/example-stats-output.md b/test/stress/stream-vs-trace/docker/example-stats-output.md new file mode 100644 index 00000000..16aa8f2f --- /dev/null +++ b/test/stress/stream-vs-trace/docker/example-stats-output.md @@ -0,0 +1,129 @@ +# Container Performance Monitoring Example + +This document shows an example of the performance data collected during the Stream vs Trace Docker test. + +## Generated Files + +After running the test, the following files are generated: + +1. **`container-stats.json`** - Raw performance data in JSON format +2. **`performance-summary.txt`** - Human-readable performance summary + +## Example Performance Summary + +``` +======================================== +Container Performance Summary +======================================== +Generated: 2024-01-15 10:30:45 +Test Duration: 2024-01-15T10:25:30Z to 2024-01-15T10:30:45Z + +---------------------------------------- +banyandb-stream Performance Metrics +---------------------------------------- +Sample Count: 60 + +CPU Usage: + Average: 45.2% + Maximum: 78.5% + +Memory Usage: + Average: 1250.5MB + Maximum: 1450.2MB + Limit: 4096MB + Usage %: 30.5% + +Network I/O: + Received: 125.4MB + Transmitted: 89.7MB + +Block I/O: + Read: 45.2MB + Write: 12.8MB + +---------------------------------------- +banyandb-trace Performance Metrics +---------------------------------------- +Sample Count: 60 + +CPU Usage: + Average: 52.8% + Maximum: 85.2% + +Memory Usage: + Average: 1380.3MB + Maximum: 1620.1MB + Limit: 4096MB + Usage %: 33.7% + +Network I/O: + Received: 98.7MB + Transmitted: 76.3MB + +Block I/O: + Read: 38.9MB + Write: 15.2MB + +---------------------------------------- +Performance Comparison +---------------------------------------- +CPU Usage Ratio (Stream/Trace): 0.86 +Memory Usage Ratio (Stream/Trace): 0.91 +``` + +## Data Points Collected + +The monitoring system collects the following performance metrics every 5 seconds: + +### CPU Metrics +- **Average CPU Usage**: Mean CPU utilization percentage +- **Maximum CPU Usage**: Peak CPU utilization during test +- **CPU Usage Ratio**: Comparison between Stream and Trace containers + +### Memory Metrics +- **Average Memory Usage**: Mean memory consumption in MB +- **Maximum Memory Usage**: Peak memory consumption during test +- **Memory Limit**: Container memory limit +- **Memory Usage Percentage**: Percentage of limit used +- **Memory Usage Ratio**: Comparison between Stream and Trace containers + +### Network I/O Metrics +- **Network Received**: Total data received from network +- **Network Transmitted**: Total data transmitted to network + +### Block I/O Metrics +- **Block Read**: Total data read from disk +- **Block Write**: Total data written to disk + +### Sample Information +- **Sample Count**: Number of data points collected +- **Test Duration**: Start and end timestamps of monitoring + +## Usage + +To run the test with performance monitoring: + +```bash +# Run complete test with monitoring +./run-docker-test.sh all + +# Or run just the test +./run-docker-test.sh test + +# View performance summary from last test +./run-docker-test.sh summary + +# View current container stats +./run-docker-test.sh stats +``` + +## Analysis + +The performance summary provides insights into: + +1. **Resource Efficiency**: Which container (Stream vs Trace) uses resources more efficiently +2. **Performance Characteristics**: CPU and memory usage patterns during different test phases +3. **I/O Patterns**: Network and disk I/O behavior differences +4. **Scalability**: How resource usage scales with workload + +This data helps in understanding the performance characteristics of the Stream and Trace models under load. diff --git a/test/stress/stream-vs-trace/docker/run-docker-test.sh b/test/stress/stream-vs-trace/docker/run-docker-test.sh index b4596668..6394e6d2 100755 --- a/test/stress/stream-vs-trace/docker/run-docker-test.sh +++ b/test/stress/stream-vs-trace/docker/run-docker-test.sh @@ -51,6 +51,7 @@ show_usage() { echo " logs Show container logs" echo " ps Show container status" echo " stats Show container resource usage" + echo " summary Show performance summary from last test" echo " all Run complete test (build, up, test, down)" echo " help Show this help message" echo "" @@ -96,21 +97,42 @@ do_test() { # Set environment variable to enable Docker test export DOCKER_TEST=true - # Run the test from the docker directory to ensure relative paths work + # Start container stats collection + echo -e "${GREEN}Starting container performance monitoring...${NC}" cd "$SCRIPT_DIR" + ./collect-stats.sh start + + # Run the test from the docker directory to ensure relative paths work go test -v -timeout 30m ./... -run TestStreamVsTraceDocker + # Stop stats collection and generate summary + echo -e "${GREEN}Stopping performance monitoring...${NC}" + ./collect-stats.sh stop + ./collect-stats.sh summary + echo -e "${GREEN}Test completed successfully!${NC}" - # Optional: Show container resource usage - echo -e "\n${YELLOW}Container Resource Usage:${NC}" + # Show final container resource usage + echo -e "\n${YELLOW}Final Container Resource Usage:${NC}" docker stats --no-stream banyandb-stream banyandb-trace + + # Display performance summary + if [ -f "performance-summary.txt" ]; then + echo -e "\n${GREEN}Performance Summary:${NC}" + cat performance-summary.txt + fi } # Function to stop containers do_down() { echo -e "${YELLOW}Stopping containers...${NC}" cd "$SCRIPT_DIR" + + # Stop stats collection if running + if [ -f "collect-stats.sh" ]; then + ./collect-stats.sh stop 2>/dev/null || true + fi + docker compose down echo -e "${GREEN}Containers stopped.${NC}" } @@ -119,6 +141,15 @@ do_down() { do_clean() { echo -e "${YELLOW}Cleaning up everything...${NC}" cd "$SCRIPT_DIR" + + # Stop stats collection and cleanup + if [ -f "collect-stats.sh" ]; then + ./collect-stats.sh cleanup 2>/dev/null || true + fi + + # Remove stats files + rm -f container-stats.json performance-summary.txt stats_pid + docker compose down -v docker compose rm -f echo -e "${GREEN}Cleanup complete.${NC}" @@ -141,6 +172,17 @@ do_stats() { docker stats --no-stream banyandb-stream banyandb-trace } +# Function to show performance summary +do_summary() { + cd "$SCRIPT_DIR" + if [ -f "performance-summary.txt" ]; then + echo -e "${GREEN}Performance Summary from Last Test:${NC}" + cat performance-summary.txt + else + echo -e "${YELLOW}No performance summary found. Run a test first.${NC}" + fi +} + # Function to run all steps do_all() { echo -e "${GREEN}Stream vs Trace Performance Test - Complete Workflow${NC}" @@ -206,6 +248,9 @@ case $COMMAND in stats) do_stats ;; + summary) + do_summary + ;; all) do_all ;; diff --git a/test/stress/stream-vs-trace/docker/schema_client.go b/test/stress/stream-vs-trace/docker/schema_client.go index ad403d2c..52b5882d 100644 --- a/test/stress/stream-vs-trace/docker/schema_client.go +++ b/test/stress/stream-vs-trace/docker/schema_client.go @@ -35,27 +35,27 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) -// SchemaClient handles schema operations via gRPC +// SchemaClient handles schema operations via gRPC. type SchemaClient struct { - groupClient databasev1.GroupRegistryServiceClient - streamClient databasev1.StreamRegistryServiceClient - traceClient databasev1.TraceRegistryServiceClient - indexRuleClient databasev1.IndexRuleRegistryServiceClient + groupClient databasev1.GroupRegistryServiceClient + streamClient databasev1.StreamRegistryServiceClient + traceClient databasev1.TraceRegistryServiceClient + indexRuleClient databasev1.IndexRuleRegistryServiceClient indexBindingClient databasev1.IndexRuleBindingRegistryServiceClient } -// NewSchemaClient creates a new schema client +// NewSchemaClient creates a new schema client. func NewSchemaClient(conn *grpc.ClientConn) *SchemaClient { return &SchemaClient{ - groupClient: databasev1.NewGroupRegistryServiceClient(conn), - streamClient: databasev1.NewStreamRegistryServiceClient(conn), - traceClient: databasev1.NewTraceRegistryServiceClient(conn), - indexRuleClient: databasev1.NewIndexRuleRegistryServiceClient(conn), + groupClient: databasev1.NewGroupRegistryServiceClient(conn), + streamClient: databasev1.NewStreamRegistryServiceClient(conn), + traceClient: databasev1.NewTraceRegistryServiceClient(conn), + indexRuleClient: databasev1.NewIndexRuleRegistryServiceClient(conn), indexBindingClient: databasev1.NewIndexRuleBindingRegistryServiceClient(conn), } } -// LoadStreamSchemas loads all stream-related schemas +// LoadStreamSchemas loads all stream-related schemas. func (s *SchemaClient) LoadStreamSchemas(ctx context.Context) error { // Load stream group if err := s.loadStreamGroup(ctx); err != nil { @@ -80,7 +80,7 @@ func (s *SchemaClient) LoadStreamSchemas(ctx context.Context) error { return nil } -// LoadTraceSchemas loads all trace-related schemas +// LoadTraceSchemas loads all trace-related schemas. func (s *SchemaClient) LoadTraceSchemas(ctx context.Context) error { // Load trace group if err := s.loadTraceGroup(ctx); err != nil { @@ -369,4 +369,4 @@ func (s *SchemaClient) loadTraceIndexRuleBindings(ctx context.Context) error { } return nil -} \ No newline at end of file +} diff --git a/test/stress/stream-vs-trace/metrics.go b/test/stress/stream-vs-trace/metrics.go index c35f9d7f..8abb6259 100644 --- a/test/stress/stream-vs-trace/metrics.go +++ b/test/stress/stream-vs-trace/metrics.go @@ -2,7 +2,8 @@ // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under -// the License. +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0
