This is an automated email from the ASF dual-hosted git repository. mrproliu pushed a commit to branch dump-ut in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e7b8ac996dfe1b051377793823618048805d73bd Author: mrproliu <[email protected]> AuthorDate: Tue Jun 23 16:23:56 2026 +0800 Make dump UT more stable --- banyand/internal/dump/measure/suite_test.go | 15 +++++++++------ banyand/internal/storage/index.go | 11 +++++++++++ banyand/internal/storage/segment.go | 9 +++++++++ pkg/index/index.go | 4 ++++ pkg/index/inverted/inverted.go | 21 +++++++++++++++++++++ 5 files changed, 54 insertions(+), 6 deletions(-) diff --git a/banyand/internal/dump/measure/suite_test.go b/banyand/internal/dump/measure/suite_test.go index b0b28271b..76ecd84df 100644 --- a/banyand/internal/dump/measure/suite_test.go +++ b/banyand/internal/dump/measure/suite_test.go @@ -353,16 +353,19 @@ func TestMeasureIndexedTagResolvedFromIndex(t *testing.T) { segmentPath := findSidxSegmentPath(t, rootPath) - // The series index persists asynchronously (unsafe batches + persister) and is - // flushed on stop; after a hard stop there is a brief window before all series - // are readable on disk. The fallback scan below sources EntityValues purely - // from this index, so wait until every written series is visible before - // asserting, otherwise the scan can race the flush and recover nothing. + // Wait until every written series has been ingested into the segment's series + // index before stopping the service. The fallback scan below sources + // EntityValues purely from this index, so the writes must have landed first; + // stopping the service then flushes the index durably to disk (the shutdown + // path in segmentController.close). Poll at a coarse interval: opening a fresh + // read-only reader is not free and hammering it competes for CPU/IO with the + // persister goroutine it is waiting on, which can starve the persister under + // the loaded -race CI run. sidxPath := filepath.Join(segmentPath, "sidx") require.Eventually(t, func() bool { count, _ := inverted.ReadOnlyDocCount(sidxPath) return count >= int64(total) - }, 60*time.Second, 100*time.Millisecond, "series index not fully persisted after stop") + }, 90*time.Second, time.Second, "series index not fully persisted before stop") // Stop the live service so it releases bluge's exclusive lock on the series // index; the dump (like the offline CLI) reads the index from a quiesced diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 5b9f17737..e623d6c98 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -425,6 +425,17 @@ func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts IndexSearchO return sd, sortedValues, err } +// Flush persists pending in-memory documents to disk and blocks until done, so a +// subsequently-opened offline reader (dump/migration) sees a complete index. The +// bluge writer's Close does not persist them, so the shutdown path must Flush +// before Close. +func (s *seriesIndex) Flush() error { + if s == nil { + return nil + } + return s.store.Flush() +} + func (s *seriesIndex) Close() error { if s == nil { return nil diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 727e41540..36723d2ed 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -1046,6 +1046,15 @@ func (sc *segmentController[T, O]) close() { // also has its directory removed; the rest keep their data on disk. for _, s := range sc.lst { s.mu.Lock() + // Persist pending in-memory series documents before closing so an offline + // reader (dump/migration) sees a complete index; skip segments flagged for + // deletion since their directory is about to be removed. This runs only on + // shutdown, not on the idle-reclaim/delete paths. + if s.index != nil && atomic.LoadUint32(&s.mustBeDeleted) == 0 { + if err := s.index.Flush(); err != nil { + s.l.Warn().Err(err).Msg("failed to flush the series index on shutdown") + } + } s.closeResourcesLocked() if atomic.LoadUint32(&s.mustBeDeleted) != 0 { s.lfs.MustRMAll(s.location) diff --git a/pkg/index/index.go b/pkg/index/index.go index 2e5f11db6..cde15d787 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -827,6 +827,10 @@ type Store interface { Searcher CollectMetrics(...string) Reset() + // Flush persists all in-memory documents to disk and blocks until the + // persistence completes, so that a subsequently-opened offline reader sees a + // durable, complete index. + Flush() error TakeFileSnapshot(dst string) error Stats() (dataCount int64, dataSizeBytes int64) } diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index d6ec0aaae..9bc6d6a49 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -297,6 +297,27 @@ func (s *store) Reset() { s.writer.ResetCache() } +// Flush forces every document currently held in memory to be persisted to disk +// and blocks until that persistence completes. The bluge writer's Close does not +// persist pending in-memory segments, so callers needing a durable on-disk index +// after a graceful stop -- e.g. an offline reader such as the dump/migration +// tools -- must Flush first. It submits an empty batch carrying a persisted +// callback; because the persister always advances the epoch, the callback fires +// once the root snapshot (covering all earlier in-memory segments) is on disk. +func (s *store) Flush() error { + if !s.closer.AddRunning() { + return nil + } + defer s.closer.Done() + b := bluge.NewBatch() + ch := make(chan error, 1) + b.SetPersistedCallback(func(err error) { ch <- err }) + if err := s.writer.Batch(b); err != nil { + return err + } + return <-ch +} + // ReadOnlyDocCount opens the index directory at path read-only and returns the // number of indexed documents. Unlike NewStore it never acquires the exclusive // directory lock, so it can inspect a closed (or even concurrently open)
