This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/unset-partid in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 49375b1106f2916617245cfef59f023411dbbf56 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Oct 1 05:37:04 2025 +0000 Fix the unset partID Refactor mustOpenPart function and update related tests to accept partID as a parameter --- banyand/internal/sidx/block_reader_test.go | 2 +- banyand/internal/sidx/interfaces.go | 4 +-- banyand/internal/sidx/iter_test.go | 14 +++++----- banyand/internal/sidx/merge.go | 24 ++++++++-------- banyand/internal/sidx/merge_test.go | 2 +- banyand/internal/sidx/multi_sidx_query_test.go | 8 +++--- banyand/internal/sidx/part.go | 3 +- banyand/internal/sidx/part_iter_test.go | 16 +++++------ banyand/internal/sidx/part_test.go | 4 +-- banyand/internal/sidx/part_wrapper.go | 28 +++++++++++++++---- banyand/internal/sidx/part_wrapper_test.go | 14 +++++++++- banyand/internal/sidx/sidx.go | 5 ++-- banyand/trace/flusher.go | 2 +- banyand/trace/merger.go | 38 +++++++++++++++++++++----- 14 files changed, 108 insertions(+), 56 deletions(-) diff --git a/banyand/internal/sidx/block_reader_test.go b/banyand/internal/sidx/block_reader_test.go index 83660e61..06ef81b3 100644 --- a/banyand/internal/sidx/block_reader_test.go +++ b/banyand/internal/sidx/block_reader_test.go @@ -206,7 +206,7 @@ func Test_blockReader_nextBlock(t *testing.T) { mpp = append(mpp, mp) mp.mustInitFromElements(elems) mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i))) - filePW := newPartWrapper(nil, mustOpenPart(partPath(tmpPath, uint64(i)), fileSystem)) + filePW := newPartWrapper(nil, mustOpenPart(uint64(i), partPath(tmpPath, uint64(i)), fileSystem)) filePW.p.partMetadata.ID = uint64(i) fpp = append(fpp, filePW) pp = append(pp, filePW.p) diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index 411c9e2a..f514886f 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -52,9 +52,9 @@ type SIDX interface { // Flush flushes the SIDX instance to disk. Flush() error // Merge merges the specified parts into a new part. - Merge(closeCh <-chan struct{}) error + Merge(closeCh <-chan struct{}) (uint64, error) // MergeMemPart merges the mem parts into a new part. - MergeMemParts(closeCh <-chan struct{}) error + MergeMemParts(closeCh <-chan struct{}) (uint64, error) // PartsToSync returns the parts to sync. PartsToSync() []*part // StreamingParts returns the streaming parts. diff --git a/banyand/internal/sidx/iter_test.go b/banyand/internal/sidx/iter_test.go index 53f34e6f..24d61235 100644 --- a/banyand/internal/sidx/iter_test.go +++ b/banyand/internal/sidx/iter_test.go @@ -233,7 +233,7 @@ func TestIterComprehensive(t *testing.T) { if partType == "file_based" { partDir := filepath.Join(tempDir, fmt.Sprintf("%s_%s_part%d", partType, tc.name, i)) mp.mustFlush(testFS, partDir) - testPart = mustOpenPart(partDir, testFS) + testPart = mustOpenPart(uint64(i), partDir, testFS) } else { testPart = openMemPart(mp) } @@ -278,7 +278,7 @@ func TestIterEdgeCases(t *testing.T) { partDir := filepath.Join(tempDir, "empty_series") mp.mustFlush(testFS, partDir) - testPart := mustOpenPart(partDir, testFS) + testPart := mustOpenPart(1, partDir, testFS) defer testPart.close() bma := generateBlockMetadataArray() @@ -317,9 +317,9 @@ func TestIterEdgeCases(t *testing.T) { mp1.mustFlush(testFS, partDir1) mp2.mustFlush(testFS, partDir2) - testPart1 := mustOpenPart(partDir1, testFS) + testPart1 := mustOpenPart(1, partDir1, testFS) defer testPart1.close() - testPart2 := mustOpenPart(partDir2, testFS) + testPart2 := mustOpenPart(2, partDir2, testFS) defer testPart2.close() bma := generateBlockMetadataArray() @@ -346,7 +346,7 @@ func TestIterEdgeCases(t *testing.T) { partDir := filepath.Join(tempDir, "single_part") mp.mustFlush(testFS, partDir) - testPart := mustOpenPart(partDir, testFS) + testPart := mustOpenPart(1, partDir, testFS) defer testPart.close() bma := generateBlockMetadataArray() @@ -427,9 +427,9 @@ func TestIterOrdering(t *testing.T) { mp1.mustFlush(testFS, partDir1) mp2.mustFlush(testFS, partDir2) - testPart1 := mustOpenPart(partDir1, testFS) + testPart1 := mustOpenPart(1, partDir1, testFS) defer testPart1.close() - testPart2 := mustOpenPart(partDir2, testFS) + testPart2 := mustOpenPart(2, partDir2, testFS) defer testPart2.close() bma := generateBlockMetadataArray() diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go index 9acbdcd3..e384bd95 100644 --- a/banyand/internal/sidx/merge.go +++ b/banyand/internal/sidx/merge.go @@ -31,11 +31,11 @@ var ( ) // Merge implements Merger interface. -func (s *sidx) Merge(closeCh <-chan struct{}) error { +func (s *sidx) Merge(closeCh <-chan struct{}) (uint64, error) { // Get current snapshot snap := s.currentSnapshot() if snap == nil { - return nil + return 0, nil } defer snap.decRef() @@ -53,7 +53,7 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error { } if len(partsToMerge) < 2 { - return nil + return 0, nil } // Mark parts for merging @@ -67,7 +67,7 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error { // Create new merged part newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root) if err != nil { - return err + return 0, err } mergeIntro.newPart = newPart @@ -77,13 +77,13 @@ func (s *sidx) Merge(closeCh <-chan struct{}) error { // Wait for merge to complete <-mergeIntro.applied - return nil + return uint64(len(partsToMerge)), nil } -func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error { +func (s *sidx) MergeMemParts(closeCh <-chan struct{}) (uint64, error) { snap := s.currentSnapshot() if snap == nil { - return nil + return 0, nil } defer snap.decRef() @@ -92,7 +92,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error { defer releaseMergerIntroduction(mergeIntro) mergeIntro.applied = make(chan struct{}) - // Select parts to merge (all active non-memory parts) + // Select parts to merge (all active memory parts) var partsToMerge []*partWrapper for _, pw := range snap.parts { if pw.isActive() && pw.isMemPart() { @@ -101,7 +101,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error { } if len(partsToMerge) < 2 { - return nil + return 0, nil } // Mark parts for merging @@ -115,7 +115,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error { // Create new merged part newPart, err := s.mergeParts(s.fileSystem, closeCh, partsToMerge, newPartID, s.root) if err != nil { - return err + return 0, err } mergeIntro.newPart = newPart @@ -125,7 +125,7 @@ func (s *sidx) MergeMemParts(closeCh <-chan struct{}) error { // Wait for merge to complete <-mergeIntro.applied - return nil + return uint64(len(partsToMerge)), nil } func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, parts []*partWrapper, partID uint64, root string) (*partWrapper, error) { @@ -158,7 +158,7 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, par } pm.mustWriteMetadata(fileSystem, dstPath) fileSystem.SyncPath(dstPath) - p := mustOpenPart(dstPath, fileSystem) + p := mustOpenPart(partID, dstPath, fileSystem) return newPartWrapper(nil, p), nil } diff --git a/banyand/internal/sidx/merge_test.go b/banyand/internal/sidx/merge_test.go index bf6eba95..178d565c 100644 --- a/banyand/internal/sidx/merge_test.go +++ b/banyand/internal/sidx/merge_test.go @@ -418,7 +418,7 @@ func Test_mergeParts(t *testing.T) { mp.mustInitFromElements(es) partPath := filepath.Join(tmpPath, "part_"+string(rune('0'+i))) mp.mustFlush(fileSystem, partPath) - filePart := mustOpenPart(partPath, fileSystem) + filePart := mustOpenPart(uint64(i), partPath, fileSystem) filePW := newPartWrapper(nil, filePart) filePW.p.partMetadata.ID = uint64(i) fpp = append(fpp, filePW) diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go index 3249a50a..853e2c16 100644 --- a/banyand/internal/sidx/multi_sidx_query_test.go +++ b/banyand/internal/sidx/multi_sidx_query_test.go @@ -62,12 +62,12 @@ func (m *mockSIDX) Flush() error { return nil } -func (m *mockSIDX) Merge(_ <-chan struct{}) error { - return nil +func (m *mockSIDX) Merge(_ <-chan struct{}) (uint64, error) { + return 0, nil } -func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) error { - return nil +func (m *mockSIDX) MergeMemParts(_ <-chan struct{}) (uint64, error) { + return 0, nil } func (m *mockSIDX) PartsToSync() []*part { diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index 5b02efbb..3413812a 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -104,7 +104,7 @@ func (p *part) ID() uint64 { // mustOpenPart opens a part from the specified path using the given file system. // It opens all standard files and discovers tag files automatically. // Panics if any required file cannot be opened. -func mustOpenPart(path string, fileSystem fs.FileSystem) *part { +func mustOpenPart(partID uint64, path string, fileSystem fs.FileSystem) *part { p := &part{ path: path, fileSystem: fileSystem, @@ -121,6 +121,7 @@ func mustOpenPart(path string, fileSystem fs.FileSystem) *part { p.close() logger.GetLogger().Panic().Err(err).Str("path", path).Msg("failed to load part metadata") } + p.partMetadata.ID = partID // Load primary block metadata from primary.bin. p.loadPrimaryBlockMetadata() diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go index eb67ab1d..a64fdde5 100644 --- a/banyand/internal/sidx/part_iter_test.go +++ b/banyand/internal/sidx/part_iter_test.go @@ -314,7 +314,7 @@ func TestPartIterVerification(t *testing.T) { partDir := filepath.Join(tempDir, fmt.Sprintf("part_%s", tt.name)) mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Run the test case @@ -376,7 +376,7 @@ func TestPartIterEdgeCases(t *testing.T) { partDir := filepath.Join(tempDir, "empty_series_test") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Test with empty series list @@ -417,7 +417,7 @@ func TestPartIterEdgeCases(t *testing.T) { partDir := filepath.Join(tempDir, "no_match_key_range") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Test with non-overlapping key range @@ -458,7 +458,7 @@ func TestPartIterEdgeCases(t *testing.T) { partDir := filepath.Join(tempDir, "no_match_series") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Test with different series ID @@ -504,7 +504,7 @@ func TestPartIterBlockFilter(t *testing.T) { partDir := filepath.Join(tempDir, "nil_filter") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Test with nil blockFilter @@ -546,7 +546,7 @@ func TestPartIterBlockFilter(t *testing.T) { partDir := filepath.Join(tempDir, "allow_all_filter") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Create a mock filter that allows all blocks @@ -591,7 +591,7 @@ func TestPartIterBlockFilter(t *testing.T) { partDir := filepath.Join(tempDir, "skip_all_filter") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Create a mock filter that skips all blocks @@ -636,7 +636,7 @@ func TestPartIterBlockFilter(t *testing.T) { partDir := filepath.Join(tempDir, "error_filter") mp.mustFlush(testFS, partDir) - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Create a mock filter that returns an error diff --git a/banyand/internal/sidx/part_test.go b/banyand/internal/sidx/part_test.go index 728a92ca..d89f8e89 100644 --- a/banyand/internal/sidx/part_test.go +++ b/banyand/internal/sidx/part_test.go @@ -139,7 +139,7 @@ func TestPartStringRepresentation(t *testing.T) { require.NoError(t, err) } - part := mustOpenPart(tempDir, testFS) + part := mustOpenPart(pm.ID, tempDir, testFS) expectedString := fmt.Sprintf("sidx part %d at %s", pm.ID, tempDir) assert.Equal(t, expectedString, part.String()) @@ -390,7 +390,7 @@ func TestMemPartFlushAndReadAllRoundTrip(t *testing.T) { mp.mustFlush(testFS, partDir) // Step 3: Open the flushed part from disk - part := mustOpenPart(partDir, testFS) + part := mustOpenPart(1, partDir, testFS) defer part.close() // Step 4: Read all elements back from part diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go index cd589f99..42244f80 100644 --- a/banyand/internal/sidx/part_wrapper.go +++ b/banyand/internal/sidx/part_wrapper.go @@ -168,11 +168,7 @@ func (pw *partWrapper) markForRemoval() { } // ID returns the unique identifier of the part. -// Returns 0 if the part is nil. func (pw *partWrapper) ID() uint64 { - if pw.p == nil || pw.p.partMetadata == nil { - return 0 - } return pw.p.partMetadata.ID } @@ -218,12 +214,32 @@ func (pw *partWrapper) String() string { } if pw.mp != nil { + var id uint64 + if pw.mp.partMetadata != nil { + id = pw.mp.partMetadata.ID + } return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, memPart=true}", - pw.ID(), state, refCount) + id, state, refCount) + } + + if pw.p == nil { + return fmt.Sprintf("partWrapper{id=nil, state=%s, ref=%d, part=nil}", state, refCount) + } + + // Handle case where p.partMetadata might be nil after cleanup + var id uint64 + var path string + if pw.p.partMetadata != nil { + id = pw.p.partMetadata.ID + } + if pw.p.path != "" { + path = pw.p.path + } else { + path = "unknown" } return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}", - pw.ID(), state, refCount, pw.p.path) + id, state, refCount, path) } // overlapsKeyRange checks if the part overlaps with the given key range. diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go index 4ca6af43..9a1a0971 100644 --- a/banyand/internal/sidx/part_wrapper_test.go +++ b/banyand/internal/sidx/part_wrapper_test.go @@ -229,7 +229,19 @@ func TestPartWrapper_MultipleReleases(t *testing.T) { pw := newPartWrapper(nil, p) - // Release once (should reach 0) + // Test multiple releases before cleanup + pw.acquire() // ref count = 2 + pw.acquire() // ref count = 3 + + pw.release() // ref count = 2 + assert.Equal(t, int32(2), pw.refCount()) + assert.True(t, pw.isActive()) + + pw.release() // ref count = 1 + assert.Equal(t, int32(1), pw.refCount()) + assert.True(t, pw.isActive()) + + // Final release should trigger cleanup and set ref to 0 pw.release() assert.Equal(t, int32(0), pw.refCount()) assert.True(t, pw.isRemoved()) diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index 64777d6b..aa792e8d 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -459,8 +459,7 @@ func (s *sidx) Flush() error { Str("part_path", partPath). Msg("flushing sidx part") } - newPW := newPartWrapper(nil, mustOpenPart(partPath, s.fileSystem)) - newPW.p.partMetadata.ID = pw.ID() + newPW := newPartWrapper(nil, mustOpenPart(pw.ID(), partPath, s.fileSystem)) flushIntro.flushed[newPW.ID()] = newPW } @@ -868,7 +867,7 @@ func (s *sidx) loadSnapshot(epoch uint64, loadedParts []uint64) { continue } partPath := partPath(s.root, id) - part := mustOpenPart(partPath, s.fileSystem) + part := mustOpenPart(id, partPath, s.fileSystem) part.partMetadata.ID = id pw := newPartWrapper(nil, part) snp.addPart(pw) diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go index 76096b4b..ce8338af 100644 --- a/banyand/trace/flusher.go +++ b/banyand/trace/flusher.go @@ -192,7 +192,7 @@ func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan *mergerIntroductio // merge memory must not be closed by the tsTable.close closeCh := make(chan struct{}) newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, - currentMergedIDs, mergeCh, closeCh, "mem") + currentMergedIDs, mergeCh, closeCh, mergeTypeMem) close(closeCh) if err != nil { if errors.Is(err, errClosed) { diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index 148aa6fa..fe2fb194 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -33,6 +33,11 @@ import ( var mergeMaxConcurrencyCh = make(chan struct{}, cgroups.CPUs()) +var ( + mergeTypeMem = "mem" + mergeTypeFile = "file" +) + func (tst *tsTable) mergeLoop(merges chan *mergerIntroduction, flusherNotifier watcher.Channel) { defer tst.loopCloser.Done() @@ -95,7 +100,7 @@ func (tst *tsTable) mergeSnapshot(curSnapshot *snapshot, merges chan *mergerIntr return nil, nil } if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, dst, - toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err != nil { + toBeMerged, merges, tst.loopCloser.CloseNotify(), mergeTypeFile); err != nil { return dst, err } return dst, nil @@ -112,12 +117,6 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, part if err != nil { return nil, err } - for _, sidxInstance := range tst.getAllSidx() { - if err := sidxInstance.MergeMemParts(closeCh); err != nil { - tst.l.Warn().Err(err).Msg("sidx merge failed") - return nil, err - } - } elapsed := time.Since(start) tst.incTotalMergeLatency(elapsed.Seconds(), typ) tst.incTotalMerged(1, typ) @@ -157,6 +156,31 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, part Msg("background merger merges unbalanced parts") } } + for sidxName, sidxInstance := range tst.getAllSidx() { + start = time.Now() + var mergedPartsCount uint64 + var err error + if typ == mergeTypeMem { + mergedPartsCount, err = sidxInstance.MergeMemParts(closeCh) + if err != nil { + tst.l.Warn().Err(err).Msg("sidx merge mem parts failed") + return nil, err + } + } else { + mergedPartsCount, err = sidxInstance.Merge(closeCh) + if err != nil { + tst.l.Warn().Err(err).Msg("sidx merge file parts failed") + return nil, err + } + } + elapsed = time.Since(start) + tst.incTotalMergeLatency(elapsed.Seconds(), fmt.Sprintf("%s_%s", typ, sidxName)) + tst.incTotalMerged(1, fmt.Sprintf("%s_%s", typ, sidxName)) + tst.incTotalMergedParts(int(mergedPartsCount), fmt.Sprintf("%s_%s", typ, sidxName)) + if elapsed > 30*time.Second { + tst.l.Warn().Uint64("mergedPartsCount", mergedPartsCount).Str("sidxName", sidxName).Dur("elapsed", elapsed).Msg("sidx merge parts took too long") + } + } mi := generateMergerIntroduction() defer releaseMergerIntroduction(mi)
