Copilot commented on code in PR #1188:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1188#discussion_r3458209688


##########
pkg/index/inverted/inverted.go:
##########
@@ -297,6 +297,27 @@ func (s *store) Reset() {
        s.writer.ResetCache()
 }
 
+// Flush forces every document currently held in memory to be persisted to disk
+// and blocks until that persistence completes. The bluge writer's Close does 
not
+// persist pending in-memory segments, so callers needing a durable on-disk 
index
+// after a graceful stop -- e.g. an offline reader such as the dump/migration
+// tools -- must Flush first. It submits an empty batch carrying a persisted
+// callback; because the persister always advances the epoch, the callback 
fires
+// once the root snapshot (covering all earlier in-memory segments) is on disk.
+func (s *store) Flush() error {
+       if !s.closer.AddRunning() {
+               return nil
+       }
+       defer s.closer.Done()
+       b := bluge.NewBatch()
+       ch := make(chan error, 1)
+       b.SetPersistedCallback(func(err error) { ch <- err })
+       if err := s.writer.Batch(b); err != nil {
+               return err
+       }
+       return <-ch
+}

Review Comment:
   `Flush()` silently returns `nil` when `AddRunning()` fails (likely meaning 
the store is closing/closed). That can report a successful flush even though no 
persistence barrier was executed, reintroducing the durability gap in shutdown 
races. Prefer returning a non-nil error (e.g., a package-level 
`ErrClosed/ErrClosing`) so callers can surface/log it, or adjust the closer 
semantics so `Flush()` can still run (or explicitly block) during a graceful 
shutdown sequence.



##########
banyand/internal/dump/measure/suite_test.go:
##########
@@ -353,16 +353,19 @@ func TestMeasureIndexedTagResolvedFromIndex(t *testing.T) 
{
 
        segmentPath := findSidxSegmentPath(t, rootPath)
 
-       // The series index persists asynchronously (unsafe batches + 
persister) and is
-       // flushed on stop; after a hard stop there is a brief window before 
all series
-       // are readable on disk. The fallback scan below sources EntityValues 
purely
-       // from this index, so wait until every written series is visible before
-       // asserting, otherwise the scan can race the flush and recover nothing.
+       // Wait until every written series has been ingested into the segment's 
series
+       // index before stopping the service. The fallback scan below sources
+       // EntityValues purely from this index, so the writes must have landed 
first;
+       // stopping the service then flushes the index durably to disk (the 
shutdown
+       // path in segmentController.close). Poll at a coarse interval: opening 
a fresh
+       // read-only reader is not free and hammering it competes for CPU/IO 
with the
+       // persister goroutine it is waiting on, which can starve the persister 
under
+       // the loaded -race CI run.
        sidxPath := filepath.Join(segmentPath, "sidx")
        require.Eventually(t, func() bool {
                count, _ := inverted.ReadOnlyDocCount(sidxPath)
                return count >= int64(total)
-       }, 60*time.Second, 100*time.Millisecond, "series index not fully 
persisted after stop")
+       }, 90*time.Second, time.Second, "series index not fully persisted 
before stop")

Review Comment:
   The test ignores `ReadOnlyDocCount` errors (`count, _ := ...`). If the 
reader intermittently fails to open/read (e.g., transient I/O errors), the test 
will treat it as `count=0` and potentially mask the real failure mode, making 
diagnosis harder. Handle `err` explicitly inside the poll (e.g., return false 
on error and optionally `t.Logf` the error) so persistent read errors fail with 
a clearer signal.



##########
banyand/internal/storage/segment.go:
##########
@@ -1046,6 +1046,15 @@ func (sc *segmentController[T, O]) close() {
        // also has its directory removed; the rest keep their data on disk.
        for _, s := range sc.lst {
                s.mu.Lock()
+               // Persist pending in-memory series documents before closing so 
an offline
+               // reader (dump/migration) sees a complete index; skip segments 
flagged for
+               // deletion since their directory is about to be removed. This 
runs only on
+               // shutdown, not on the idle-reclaim/delete paths.
+               if s.index != nil && atomic.LoadUint32(&s.mustBeDeleted) == 0 {
+                       if err := s.index.Flush(); err != nil {
+                               s.l.Warn().Err(err).Msg("failed to flush the 
series index on shutdown")
+                       }
+               }

Review Comment:
   `s.index.Flush()` is potentially blocking I/O and is executed while holding 
`s.mu.Lock()`. This can unnecessarily serialize shutdown and prolong lock hold 
times. If `close()` can run while other goroutines are still trying to acquire 
`s.mu` (even during shutdown), it increases the risk of shutdown stalls. 
Consider capturing `idx := s.index` and the delete flag under the lock, 
releasing the lock to perform `idx.Flush()`, then re-locking to proceed with 
`closeResourcesLocked()`.



##########
pkg/index/inverted/inverted.go:
##########
@@ -297,6 +297,27 @@ func (s *store) Reset() {
        s.writer.ResetCache()
 }
 
+// Flush forces every document currently held in memory to be persisted to disk
+// and blocks until that persistence completes. The bluge writer's Close does 
not
+// persist pending in-memory segments, so callers needing a durable on-disk 
index
+// after a graceful stop -- e.g. an offline reader such as the dump/migration
+// tools -- must Flush first. It submits an empty batch carrying a persisted
+// callback; because the persister always advances the epoch, the callback 
fires
+// once the root snapshot (covering all earlier in-memory segments) is on disk.
+func (s *store) Flush() error {
+       if !s.closer.AddRunning() {
+               return nil
+       }
+       defer s.closer.Done()
+       b := bluge.NewBatch()
+       ch := make(chan error, 1)
+       b.SetPersistedCallback(func(err error) { ch <- err })

Review Comment:
   The persisted callback writes to a buffered channel of size 1. If the 
callback can ever be invoked more than once (e.g., multiple persist events or 
retry/error paths), the second send would block and could deadlock the 
persister goroutine. Safer patterns here are: guard the send with `sync.Once`, 
or use a non-blocking send (select with default) and close/cleanup signaling so 
the callback can never block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to