This is an automated email from the ASF dual-hosted git repository.

mrproliu pushed a commit to branch dump-ut
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e7b8ac996dfe1b051377793823618048805d73bd
Author: mrproliu <[email protected]>
AuthorDate: Tue Jun 23 16:23:56 2026 +0800

    Make dump UT more stable
---
 banyand/internal/dump/measure/suite_test.go | 15 +++++++++------
 banyand/internal/storage/index.go           | 11 +++++++++++
 banyand/internal/storage/segment.go         |  9 +++++++++
 pkg/index/index.go                          |  4 ++++
 pkg/index/inverted/inverted.go              | 21 +++++++++++++++++++++
 5 files changed, 54 insertions(+), 6 deletions(-)

diff --git a/banyand/internal/dump/measure/suite_test.go 
b/banyand/internal/dump/measure/suite_test.go
index b0b28271b..76ecd84df 100644
--- a/banyand/internal/dump/measure/suite_test.go
+++ b/banyand/internal/dump/measure/suite_test.go
@@ -353,16 +353,19 @@ func TestMeasureIndexedTagResolvedFromIndex(t *testing.T) 
{
 
        segmentPath := findSidxSegmentPath(t, rootPath)
 
-       // The series index persists asynchronously (unsafe batches + 
persister) and is
-       // flushed on stop; after a hard stop there is a brief window before 
all series
-       // are readable on disk. The fallback scan below sources EntityValues 
purely
-       // from this index, so wait until every written series is visible before
-       // asserting, otherwise the scan can race the flush and recover nothing.
+       // Wait until every written series has been ingested into the segment's 
series
+       // index before stopping the service. The fallback scan below sources
+       // EntityValues purely from this index, so the writes must have landed 
first;
+       // stopping the service then flushes the index durably to disk (the 
shutdown
+       // path in segmentController.close). Poll at a coarse interval: opening 
a fresh
+       // read-only reader is not free and hammering it competes for CPU/IO 
with the
+       // persister goroutine it is waiting on, which can starve the persister 
under
+       // the loaded -race CI run.
        sidxPath := filepath.Join(segmentPath, "sidx")
        require.Eventually(t, func() bool {
                count, _ := inverted.ReadOnlyDocCount(sidxPath)
                return count >= int64(total)
-       }, 60*time.Second, 100*time.Millisecond, "series index not fully 
persisted after stop")
+       }, 90*time.Second, time.Second, "series index not fully persisted 
before stop")
 
        // Stop the live service so it releases bluge's exclusive lock on the 
series
        // index; the dump (like the offline CLI) reads the index from a 
quiesced
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 5b9f17737..e623d6c98 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -425,6 +425,17 @@ func (s *seriesIndex) SearchWithoutSeries(ctx 
context.Context, opts IndexSearchO
        return sd, sortedValues, err
 }
 
+// Flush persists pending in-memory documents to disk and blocks until done, 
so a
+// subsequently-opened offline reader (dump/migration) sees a complete index. 
The
+// bluge writer's Close does not persist them, so the shutdown path must Flush
+// before Close.
+func (s *seriesIndex) Flush() error {
+       if s == nil {
+               return nil
+       }
+       return s.store.Flush()
+}
+
 func (s *seriesIndex) Close() error {
        if s == nil {
                return nil
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 727e41540..36723d2ed 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -1046,6 +1046,15 @@ func (sc *segmentController[T, O]) close() {
        // also has its directory removed; the rest keep their data on disk.
        for _, s := range sc.lst {
                s.mu.Lock()
+               // Persist pending in-memory series documents before closing so 
an offline
+               // reader (dump/migration) sees a complete index; skip segments 
flagged for
+               // deletion since their directory is about to be removed. This 
runs only on
+               // shutdown, not on the idle-reclaim/delete paths.
+               if s.index != nil && atomic.LoadUint32(&s.mustBeDeleted) == 0 {
+                       if err := s.index.Flush(); err != nil {
+                               s.l.Warn().Err(err).Msg("failed to flush the 
series index on shutdown")
+                       }
+               }
                s.closeResourcesLocked()
                if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
                        s.lfs.MustRMAll(s.location)
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 2e5f11db6..cde15d787 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -827,6 +827,10 @@ type Store interface {
        Searcher
        CollectMetrics(...string)
        Reset()
+       // Flush persists all in-memory documents to disk and blocks until the
+       // persistence completes, so that a subsequently-opened offline reader 
sees a
+       // durable, complete index.
+       Flush() error
        TakeFileSnapshot(dst string) error
        Stats() (dataCount int64, dataSizeBytes int64)
 }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index d6ec0aaae..9bc6d6a49 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -297,6 +297,27 @@ func (s *store) Reset() {
        s.writer.ResetCache()
 }
 
+// Flush forces every document currently held in memory to be persisted to disk
+// and blocks until that persistence completes. The bluge writer's Close does 
not
+// persist pending in-memory segments, so callers needing a durable on-disk 
index
+// after a graceful stop -- e.g. an offline reader such as the dump/migration
+// tools -- must Flush first. It submits an empty batch carrying a persisted
+// callback; because the persister always advances the epoch, the callback 
fires
+// once the root snapshot (covering all earlier in-memory segments) is on disk.
+func (s *store) Flush() error {
+       if !s.closer.AddRunning() {
+               return nil
+       }
+       defer s.closer.Done()
+       b := bluge.NewBatch()
+       ch := make(chan error, 1)
+       b.SetPersistedCallback(func(err error) { ch <- err })
+       if err := s.writer.Batch(b); err != nil {
+               return err
+       }
+       return <-ch
+}
+
 // ReadOnlyDocCount opens the index directory at path read-only and returns the
 // number of indexed documents. Unlike NewStore it never acquires the exclusive
 // directory lock, so it can inspect a closed (or even concurrently open)

Reply via email to