Copilot commented on code in PR #1188:
URL:
https://github.com/apache/skywalking-banyandb/pull/1188#discussion_r3458209688
##########
pkg/index/inverted/inverted.go:
##########
@@ -297,6 +297,27 @@ func (s *store) Reset() {
s.writer.ResetCache()
}
+// Flush forces every document currently held in memory to be persisted to disk
+// and blocks until that persistence completes. The bluge writer's Close does
not
+// persist pending in-memory segments, so callers needing a durable on-disk
index
+// after a graceful stop -- e.g. an offline reader such as the dump/migration
+// tools -- must Flush first. It submits an empty batch carrying a persisted
+// callback; because the persister always advances the epoch, the callback
fires
+// once the root snapshot (covering all earlier in-memory segments) is on disk.
+func (s *store) Flush() error {
+ if !s.closer.AddRunning() {
+ return nil
+ }
+ defer s.closer.Done()
+ b := bluge.NewBatch()
+ ch := make(chan error, 1)
+ b.SetPersistedCallback(func(err error) { ch <- err })
+ if err := s.writer.Batch(b); err != nil {
+ return err
+ }
+ return <-ch
+}
Review Comment:
`Flush()` silently returns `nil` when `AddRunning()` fails (likely meaning
the store is closing/closed). That can report a successful flush even though no
persistence barrier was executed, reintroducing the durability gap in shutdown
races. Prefer returning a non-nil error (e.g., a package-level
`ErrClosed/ErrClosing`) so callers can surface/log it, or adjust the closer
semantics so `Flush()` can still run (or explicitly block) during a graceful
shutdown sequence.
##########
banyand/internal/dump/measure/suite_test.go:
##########
@@ -353,16 +353,19 @@ func TestMeasureIndexedTagResolvedFromIndex(t *testing.T)
{
segmentPath := findSidxSegmentPath(t, rootPath)
- // The series index persists asynchronously (unsafe batches +
persister) and is
- // flushed on stop; after a hard stop there is a brief window before
all series
- // are readable on disk. The fallback scan below sources EntityValues
purely
- // from this index, so wait until every written series is visible before
- // asserting, otherwise the scan can race the flush and recover nothing.
+ // Wait until every written series has been ingested into the segment's
series
+ // index before stopping the service. The fallback scan below sources
+ // EntityValues purely from this index, so the writes must have landed
first;
+ // stopping the service then flushes the index durably to disk (the
shutdown
+ // path in segmentController.close). Poll at a coarse interval: opening
a fresh
+ // read-only reader is not free and hammering it competes for CPU/IO
with the
+ // persister goroutine it is waiting on, which can starve the persister
under
+ // the loaded -race CI run.
sidxPath := filepath.Join(segmentPath, "sidx")
require.Eventually(t, func() bool {
count, _ := inverted.ReadOnlyDocCount(sidxPath)
return count >= int64(total)
- }, 60*time.Second, 100*time.Millisecond, "series index not fully
persisted after stop")
+ }, 90*time.Second, time.Second, "series index not fully persisted
before stop")
Review Comment:
The test ignores `ReadOnlyDocCount` errors (`count, _ := ...`). If the
reader intermittently fails to open/read (e.g., transient I/O errors), the test
will treat it as `count=0` and potentially mask the real failure mode, making
diagnosis harder. Handle `err` explicitly inside the poll (e.g., return false
on error and optionally `t.Logf` the error) so persistent read errors fail with
a clearer signal.
##########
banyand/internal/storage/segment.go:
##########
@@ -1046,6 +1046,15 @@ func (sc *segmentController[T, O]) close() {
// also has its directory removed; the rest keep their data on disk.
for _, s := range sc.lst {
s.mu.Lock()
+ // Persist pending in-memory series documents before closing so
an offline
+ // reader (dump/migration) sees a complete index; skip segments
flagged for
+ // deletion since their directory is about to be removed. This
runs only on
+ // shutdown, not on the idle-reclaim/delete paths.
+ if s.index != nil && atomic.LoadUint32(&s.mustBeDeleted) == 0 {
+ if err := s.index.Flush(); err != nil {
+ s.l.Warn().Err(err).Msg("failed to flush the
series index on shutdown")
+ }
+ }
Review Comment:
`s.index.Flush()` is potentially blocking I/O and is executed while holding
`s.mu.Lock()`. This can unnecessarily serialize shutdown and prolong lock hold
times. If `close()` can run while other goroutines are still trying to acquire
`s.mu` (even during shutdown), it increases the risk of shutdown stalls.
Consider capturing `idx := s.index` and the delete flag under the lock,
releasing the lock to perform `idx.Flush()`, then re-locking to proceed with
`closeResourcesLocked()`.
##########
pkg/index/inverted/inverted.go:
##########
@@ -297,6 +297,27 @@ func (s *store) Reset() {
s.writer.ResetCache()
}
+// Flush forces every document currently held in memory to be persisted to disk
+// and blocks until that persistence completes. The bluge writer's Close does
not
+// persist pending in-memory segments, so callers needing a durable on-disk
index
+// after a graceful stop -- e.g. an offline reader such as the dump/migration
+// tools -- must Flush first. It submits an empty batch carrying a persisted
+// callback; because the persister always advances the epoch, the callback
fires
+// once the root snapshot (covering all earlier in-memory segments) is on disk.
+func (s *store) Flush() error {
+ if !s.closer.AddRunning() {
+ return nil
+ }
+ defer s.closer.Done()
+ b := bluge.NewBatch()
+ ch := make(chan error, 1)
+ b.SetPersistedCallback(func(err error) { ch <- err })
Review Comment:
The persisted callback writes to a buffered channel of size 1. If the
callback can ever be invoked more than once (e.g., multiple persist events or
retry/error paths), the second send would block and could deadlock the
persister goroutine. Safer patterns here are: guard the send with `sync.Once`,
or use a non-blocking send (select with default) and close/cleanup signaling so
the callback can never block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]