hanahmily commented on code in PR #692:
URL: 
https://github.com/apache/skywalking-banyandb/pull/692#discussion_r2192379804


##########
banyand/property/repair.go:
##########
@@ -0,0 +1,853 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "bufio"
+       "crypto/sha512"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "hash"
+       "io"
+       "os"
+       "path"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/benbjohnson/clock"
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/index"
+       "github.com/cespare/xxhash/v2"
+       "github.com/pkg/errors"
+       "github.com/robfig/cron/v3"
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const repairBatchSearchSize = 100
+
+type repair struct {
+       latestBuildTreeSchedule   time.Time
+       buildTreeClock            clock.Clock
+       closer                    *run.Closer
+       repairTreeScheduler       *timestamp.Scheduler
+       metrics                   *repairMetrics
+       l                         *logger.Logger
+       quickRepairNotified       *int32
+       takeSnapshot              func(string) error
+       shardPath                 string
+       repairBasePath            string
+       snapshotDir               string
+       statePath                 string
+       composeSlotAppendFilePath string
+       composeTreeFilePathFmt    string
+       treeSlotCount             int
+       batchSearchSize           int
+       buildTreeScheduleInterval time.Duration
+       repairQuickBuildTreeTime  time.Duration
+       buildTreeLocker           sync.Mutex
+}
+
+func newRepair(
+       shardPath string,
+       l *logger.Logger,
+       metricsFactory *observability.Factory,
+       batchSearchSize,
+       treeSlotCount int,
+       repairBuildTreeCron string,
+       repairQuickBuildTreeTime time.Duration,
+       takeSnapshot func(string) error,
+) (r *repair, err error) {
+       repairBase := path.Join(shardPath, "repair")
+       var quickRepairNotified int32
+       r = &repair{
+               shardPath:                 shardPath,
+               l:                         l,
+               repairBasePath:            repairBase,
+               snapshotDir:               path.Join(repairBase, "snapshot"),
+               statePath:                 path.Join(repairBase, "state.json"),
+               composeSlotAppendFilePath: path.Join(repairBase, 
"state-append-%d.tmp"),
+               composeTreeFilePathFmt:    path.Join(repairBase, 
"state-tree-%s.data"),
+               treeSlotCount:             treeSlotCount,
+               batchSearchSize:           batchSearchSize,
+               takeSnapshot:              takeSnapshot,
+               repairQuickBuildTreeTime:  repairQuickBuildTreeTime,
+               quickRepairNotified:       &quickRepairNotified,
+               metrics:                   newRepairMetrics(metricsFactory),
+               closer:                    run.NewCloser(1),
+       }
+       if err = r.initScheduler(repairBuildTreeCron); err != nil {
+               return nil, fmt.Errorf("init scheduler: %w", err)
+       }
+       return r, nil
+}
+
+func (r *repair) initScheduler(exp string) error {
+       r.buildTreeClock = clock.New()
+       c := timestamp.NewScheduler(r.l, r.buildTreeClock)
+       r.repairTreeScheduler = c
+       err := c.Register("repair", cron.Descriptor, exp, func(t time.Time, _ 
*logger.Logger) bool {
+               r.doRepairScheduler(t, true)
+               return true
+       })
+       if err != nil {
+               return fmt.Errorf("failed to add repair build tree cron task: 
%w", err)
+       }
+       interval, nextTime, exist := c.Interval("repair")
+       if !exist {
+               return fmt.Errorf("failed to get repair build tree cron task 
interval")
+       }
+       r.buildTreeScheduleInterval = interval
+       r.latestBuildTreeSchedule = nextTime.Add(-interval)
+       return nil
+}
+
+func (r *repair) documentUpdatesNotify() {
+       if !atomic.CompareAndSwapInt32(r.quickRepairNotified, 0, 1) {
+               return
+       }
+
+       go func() {
+               select {
+               case <-r.closer.CloseNotify():
+                       return
+               case <-time.After(r.repairQuickBuildTreeTime):
+                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       // reset the notified flag to allow the next 
notification
+                       atomic.StoreInt32(r.quickRepairNotified, 0)
+               }
+       }()
+}
+
+func (r *repair) doRepairScheduler(t time.Time, triggerByCron bool) {
+       if !triggerByCron {
+               // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
+               if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       return
+               }
+       } else {
+               r.latestBuildTreeSchedule = t
+       }
+
+       // if already building the tree, skip this run
+       if !r.buildTreeLocker.TryLock() {
+               return
+       }
+       defer r.buildTreeLocker.Unlock()
+       err := r.buildStatus()
+       if err != nil {
+               r.l.Err(fmt.Errorf("repair build status failure: %w", err))
+       }
+}
+
+func (r *repair) buildStatus() (err error) {
+       startTime := time.Now()
+       defer func() {
+               r.metrics.totalBuildTreeFinished.Inc(1)
+               if err != nil {
+                       r.metrics.totalBuildTreeFailures.Inc(1)
+               }
+               
r.metrics.totalBuildTreeDuration.Inc(time.Since(startTime).Seconds())
+       }()
+       var state *repairStatus
+       // reading the state file to check they have any updates
+       state, err = r.readState()
+       if err != nil {
+               return fmt.Errorf("reading state failure: %w", err)
+       }
+       indexConfig := index.DefaultConfig(r.shardPath)
+       items, err := indexConfig.DirectoryFunc().List(index.ItemKindSegment)
+       if err != nil {
+               return fmt.Errorf("reading item kind segment failure: %w", err)
+       }
+       sort.Sort(snapshotIDList(items))
+       // check the snapshot ID have any updated
+       // if no updates, the building Trees should be skipped
+       if state != nil && len(items) != 0 && items[len(items)-1] == 
state.LastSnpID {
+               return nil
+       }
+       if len(items) == 0 {
+               return nil
+       }
+
+       // otherwise, we need to building the trees
+       // take a new snapshot first
+       err = r.takeSnapshot(r.snapshotDir)
+       if err != nil {
+               return fmt.Errorf("taking snapshot failure: %w", err)
+       }
+       blugeConf := bluge.DefaultConfig(r.snapshotDir)
+       err = r.buildTree(blugeConf)
+       if err != nil {
+               return fmt.Errorf("building trees failure: %w", err)
+       }
+
+       var latestSnapshotID uint64
+       if len(items) > 0 {
+               latestSnapshotID = items[len(items)-1]
+       }
+       // save the Trees to the state
+       state = &repairStatus{
+               LastSnpID:    latestSnapshotID,
+               LastSyncTime: time.Now(),
+       }
+       stateVal, err := json.Marshal(state)
+       if err != nil {
+               return fmt.Errorf("marshall state failure: %w", err)
+       }
+       err = os.WriteFile(r.statePath, stateVal, storage.FilePerm)
+       if err != nil {
+               return fmt.Errorf("writing state file failure: %w", err)
+       }
+       return nil
+}
+
+func (r *repair) buildTree(conf bluge.Config) error {
+       reader, err := bluge.OpenReader(conf)
+       if err != nil {
+               return fmt.Errorf("opening index reader failure: %w", err)
+       }
+       defer func() {
+               _ = reader.Close()
+       }()
+       topNSearch := bluge.NewTopNSearch(r.batchSearchSize, 
bluge.NewMatchAllQuery())
+       topNSearch.SortBy([]string{
+               fmt.Sprintf("+%s", groupField),
+               fmt.Sprintf("+%s", nameField),
+               fmt.Sprintf("+%s", entityID),
+               fmt.Sprintf("+%s", timestampField),
+       })
+
+       var latestProperty *searchingProperty
+       treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath, 
r.composeTreeFilePathFmt, r.treeSlotCount, r.l)
+       if err != nil {
+               return fmt.Errorf("creating repair tree composer failure: %w", 
err)
+       }
+       err = r.pageSearch(reader, topNSearch, func(source []byte, shaValue 
string, deleteTime int64) error {
+               // building the entity
+               if len(source) == 0 {
+                       return nil
+               }
+               var property propertyv1.Property
+               err = protojson.Unmarshal(source, &property)
+               if err != nil {
+                       return err
+               }
+               entity := GetEntity(&property)
+               if shaValue == "" {
+                       shaValue, err = r.buildShaValue(source, deleteTime)
+                       if err != nil {
+                               return fmt.Errorf("building sha value failure: 
%w", err)
+                       }
+               }
+
+               s := newSearchingProperty(&property, shaValue, entity)
+               if latestProperty != nil {
+                       if latestProperty.group != property.Metadata.Group {
+                               // if the group have changed, we need to append 
the latest property to the tree composer, and compose builder
+                               // the entity is changed, need to save the 
property
+                               if err = 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err != 
nil {
+                                       return fmt.Errorf("appending property 
to tree composer failure: %w", err)
+                               }
+                               err = 
treeComposer.composeNextGroupAndSave(latestProperty.group)
+                               if err != nil {
+                                       return fmt.Errorf("composing group 
failure: %w", err)
+                               }
+                       } else if latestProperty.entityID != entity {
+                               // the entity is changed, need to save the 
property
+                               if err = 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err != 
nil {
+                                       return fmt.Errorf("appending property 
to tree composer failure: %w", err)
+                               }
+                       }
+               }
+               latestProperty = s
+               return nil
+       })
+       if err != nil {
+               return err
+       }
+       // if the latestProperty is not nil, it means the latest property need 
to be saved
+       if latestProperty != nil {
+               if err = treeComposer.append(latestProperty.entityID, 
latestProperty.shaValue); err != nil {
+                       return fmt.Errorf("appending latest property to tree 
composer failure: %w", err)
+               }
+               err = treeComposer.composeNextGroupAndSave(latestProperty.group)
+               if err != nil {
+                       return fmt.Errorf("composing last group failure: %w", 
err)
+               }
+       }
+
+       return nil
+}
+
+//nolint:contextcheck
+func (r *repair) pageSearch(reader *bluge.Reader, searcher *bluge.TopNSearch, 
each func(source []byte, shaValue string, deleteTime int64) error) error {
+       var latestDocValues [][]byte
+       for {
+               searcher.After(latestDocValues)
+               result, err := reader.Search(r.closer.Ctx(), searcher)
+               if err != nil {
+                       return fmt.Errorf("searching index failure: %w", err)
+               }
+
+               next, err := result.Next()
+               var hitNumber int
+               if err != nil {
+                       return errors.WithMessage(err, "iterate document match 
iterator")
+               }
+               // if next is nil, it means no more documents to process
+               if next == nil {
+                       return nil
+               }
+               var shaValue string
+               var source []byte
+               var deleteTime int64
+               for err == nil && next != nil {
+                       hitNumber = next.HitNumber
+                       var errTime error
+                       err = next.VisitStoredFields(func(field string, value 
[]byte) bool {
+                               switch field {
+                               case shaValueField:
+                                       shaValue = convert.BytesToString(value)
+                               case sourceField:
+                                       source = value
+                               case deleteField:
+                                       deleteTime = convert.BytesToInt64(value)
+                               }
+                               return true
+                       })
+                       if err = multierr.Combine(err, errTime); err != nil {
+                               return errors.WithMessagef(err, "visit stored 
fields, hit: %d", hitNumber)
+                       }
+                       err = each(source, shaValue, deleteTime)
+                       if err != nil {
+                               return errors.WithMessagef(err, "processing 
source failure, hit: %d", hitNumber)
+                       }
+                       latestDocValues = next.SortValue
+                       next, err = result.Next()
+               }
+       }
+}
+
+func (r *repair) buildShaValue(source []byte, deleteTime int64) (string, 
error) {
+       var err error
+       hash := sha512.New()
+       _, err = hash.Write(source)
+       if err != nil {
+               return "", fmt.Errorf("hashing source failure: %w", err)
+       }
+       _, err = hash.Write([]byte(fmt.Sprintf("%d", deleteTime)))
+       if err != nil {
+               return "", fmt.Errorf("hashing delete time failure: %w", err)
+       }
+       return fmt.Sprintf("%x", hash.Sum(nil)), nil
+}
+
+func (r *repair) readState() (*repairStatus, error) {
+       stateFile, err := os.ReadFile(r.statePath)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       var status repairStatus
+       err = json.Unmarshal(stateFile, &status)
+       if err != nil {
+               return nil, err
+       }
+       return &status, nil
+}
+
+func (r *repair) close() {
+       r.closer.Done()
+       r.closer.CloseThenWait()
+       r.repairTreeScheduler.Close()
+}
+
+type repairStatus struct {
+       LastSyncTime time.Time `json:"last_sync_time"`
+       LastSnpID    uint64    `json:"last_snp_id"`
+}
+
+type repairTreeNodeType int
+
+const (
+       repairTreeNodeTypeRoot repairTreeNodeType = iota
+       repairTreeNodeTypeSlot
+       repairTreeNodeTypeLeaf
+)
+
+type repairTreeNode struct {
+       shaValue  string
+       id        string
+       tp        repairTreeNodeType
+       leafStart int64
+       leafLen   int64
+}
+type repairTreeReader struct {
+       file   *os.File
+       reader *bufio.Reader
+       footer *repairTreeFooter
+}
+
+func (r *repair) treeReader(group string) (*repairTreeReader, error) {
+       groupFile := fmt.Sprintf(r.composeTreeFilePathFmt, group)
+       file, err := os.OpenFile(groupFile, os.O_RDONLY, os.ModePerm)
+       if err != nil {
+               if errors.Is(err, os.ErrNotExist) {
+                       // if the file does not exist, it means no repair tree 
for this group
+                       return nil, nil
+               }
+               return nil, fmt.Errorf("opening repair tree file %s failure: 
%w", group, err)
+       }
+       reader := &repairTreeReader{
+               file:   file,
+               reader: bufio.NewReader(file),
+       }
+       if err = reader.readFoot(); err != nil {
+               _ = file.Close()
+               return nil, fmt.Errorf("reading footer from repair tree file %s 
failure: %w", groupFile, err)
+       }
+       return reader, nil
+}
+
+func (r *repairTreeReader) readFoot() error {
+       stat, err := r.file.Stat()
+       if err != nil {
+               return fmt.Errorf("getting file stat for %s failure: %w", 
r.file.Name(), err)
+       }
+       footerOffset := stat.Size() - 1
+       if _, err = r.file.Seek(footerOffset, io.SeekStart); err != nil {
+               return fmt.Errorf("seeking to footer offset %d in file %s 
failure: %w", footerOffset, r.file.Name(), err)
+       }
+       var footLen uint8
+       if err = binary.Read(r.file, binary.LittleEndian, &footLen); err != nil 
{
+               return fmt.Errorf("reading footer length from file %s failure: 
%w", r.file.Name(), err)
+       }
+       _, err = r.file.Seek(-(int64(footLen) + 1), io.SeekCurrent)
+       if err != nil {
+               return fmt.Errorf("seeking to start of footer in file %s 
failure: %w", r.file.Name(), err)
+       }
+       footerBytes := make([]byte, footLen)
+       if _, err = io.ReadFull(r.file, footerBytes); err != nil {
+               return fmt.Errorf("reading footer from file %s failure: %w", 
r.file.Name(), err)
+       }
+       footerData := make([]int64, 4)
+       _, err = encoding.BytesToVarInt64List(footerData, footerBytes)
+       if err != nil {
+               return fmt.Errorf("decoding footer from file %s failure: %w", 
r.file.Name(), err)
+       }
+       r.footer = &repairTreeFooter{
+               leafNodeFinishedOffset: footerData[0],
+               slotNodeLen:            footerData[1],
+               slotNodeFinishedOffset: footerData[2],
+               rootNodeLen:            footerData[3],
+       }
+       return nil
+}
+
+func (r *repairTreeReader) seekPosition(offset int64, whence int) error {
+       _, err := r.file.Seek(offset, whence)
+       if err != nil {
+               return fmt.Errorf("seeking position failure: %w", err)
+       }
+       r.reader.Reset(r.file)
+       return nil
+}
+
+func (r *repairTreeReader) read(parent *repairTreeNode) ([]*repairTreeNode, 
error) {
+       if parent == nil {
+               // reading the root node
+               err := r.seekPosition(r.footer.slotNodeFinishedOffset, 
io.SeekStart)
+               if err != nil {
+                       return nil, fmt.Errorf("seeking to root node offset %d 
in file %s failure: %w", r.footer.slotNodeFinishedOffset, r.file.Name(), err)
+               }
+               rootDataBytes := make([]byte, r.footer.rootNodeLen)
+               if _, err = io.ReadFull(r.reader, rootDataBytes); err != nil {
+                       return nil, fmt.Errorf("reading root node data from 
file %s failure: %w", r.file.Name(), err)
+               }
+               _, shaValue, err := encoding.DecodeBytes(rootDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding root node sha value 
from file %s failure: %w", r.file.Name(), err)
+               }
+               return []*repairTreeNode{
+                       {
+                               shaValue: fmt.Sprintf("%x", shaValue),
+                               tp:       repairTreeNodeTypeRoot,
+                       },
+               }, nil
+       }
+
+       var err error
+       nodes := make([]*repairTreeNode, 0)
+       if parent.tp == repairTreeNodeTypeRoot {
+               // reading the slot nodes
+               if err = r.seekPosition(r.footer.leafNodeFinishedOffset, 
io.SeekStart); err != nil {
+                       return nil, fmt.Errorf("seeking to slot node offset %d 
in file %s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err)
+               }
+               slotDataBytes := make([]byte, r.footer.slotNodeLen)
+               if _, err = io.ReadFull(r.reader, slotDataBytes); err != nil {
+                       return nil, fmt.Errorf("reading slot node data from 
file %s failure: %w", r.file.Name(), err)
+               }
+               reduceBytesLen := int64(len(slotDataBytes))
+
+               var slotNodeIndex, leafStartOff, leafLen int64
+               var slotShaVal []byte
+               for reduceBytesLen > 0 {
+                       slotDataBytes, slotNodeIndex, err = 
encoding.BytesToVarInt64(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node 
index from file %s failure: %w", r.file.Name(), err)
+                       }
+                       slotDataBytes, slotShaVal, err = 
encoding.DecodeBytes(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node sha 
value from file %s failure: %w", r.file.Name(), err)
+                       }
+                       slotDataBytes, leafStartOff, err = 
encoding.BytesToVarInt64(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node leaf 
start offset from file %s failure: %w", r.file.Name(), err)
+                       }
+                       slotDataBytes, leafLen, err = 
encoding.BytesToVarInt64(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node leaf 
length from file %s failure: %w", r.file.Name(), err)
+                       }
+                       reduceBytesLen = int64(len(slotDataBytes))
+                       nodes = append(nodes, &repairTreeNode{
+                               shaValue:  fmt.Sprintf("%x", slotShaVal),
+                               id:        fmt.Sprintf("%d", slotNodeIndex),
+                               tp:        repairTreeNodeTypeSlot,
+                               leafStart: leafStartOff,
+                               leafLen:   leafLen,
+                       })
+               }
+               return nodes, nil
+       } else if parent.tp == repairTreeNodeTypeLeaf {
+               return nil, nil
+       }
+
+       // otherwise, reading the leaf nodes
+       err = r.seekPosition(parent.leafStart, io.SeekStart)
+       if err != nil {
+               return nil, fmt.Errorf("seeking to leaf node offset %d in file 
%s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err)
+       }
+       leafDataBytes := make([]byte, parent.leafLen)
+       if _, err = io.ReadFull(r.reader, leafDataBytes); err != nil {
+               return nil, fmt.Errorf("reading leaf node data from file %s 
failure: %w", r.file.Name(), err)
+       }
+       reduceBytesLen := int64(len(leafDataBytes))
+       for reduceBytesLen > 0 {
+               var entity, shaVal []byte
+               leafDataBytes, entity, err = encoding.DecodeBytes(leafDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding leaf node entity from 
file %s failure: %w", r.file.Name(), err)
+               }
+               leafDataBytes, shaVal, err = encoding.DecodeBytes(leafDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding leaf node sha value 
from file %s failure: %w", r.file.Name(), err)
+               }
+               reduceBytesLen = int64(len(leafDataBytes))
+               nodes = append(nodes, &repairTreeNode{

Review Comment:
   You should use the streaming method to load leaves. Loading all data into 
memory at once will cause OOM errors. Providing a readLine method for 
retrieving leaves is a better solution.



##########
banyand/property/repair.go:
##########
@@ -0,0 +1,853 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "bufio"
+       "crypto/sha512"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "hash"
+       "io"
+       "os"
+       "path"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/benbjohnson/clock"
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/index"
+       "github.com/cespare/xxhash/v2"
+       "github.com/pkg/errors"
+       "github.com/robfig/cron/v3"
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const repairBatchSearchSize = 100
+
+type repair struct {
+       latestBuildTreeSchedule   time.Time
+       buildTreeClock            clock.Clock
+       closer                    *run.Closer
+       repairTreeScheduler       *timestamp.Scheduler
+       metrics                   *repairMetrics
+       l                         *logger.Logger
+       quickRepairNotified       *int32
+       takeSnapshot              func(string) error
+       shardPath                 string
+       repairBasePath            string
+       snapshotDir               string
+       statePath                 string
+       composeSlotAppendFilePath string
+       composeTreeFilePathFmt    string
+       treeSlotCount             int
+       batchSearchSize           int
+       buildTreeScheduleInterval time.Duration
+       repairQuickBuildTreeTime  time.Duration
+       buildTreeLocker           sync.Mutex
+}
+
+func newRepair(
+       shardPath string,
+       l *logger.Logger,
+       metricsFactory *observability.Factory,
+       batchSearchSize,
+       treeSlotCount int,
+       repairBuildTreeCron string,
+       repairQuickBuildTreeTime time.Duration,
+       takeSnapshot func(string) error,
+) (r *repair, err error) {
+       repairBase := path.Join(shardPath, "repair")
+       var quickRepairNotified int32
+       r = &repair{
+               shardPath:                 shardPath,
+               l:                         l,
+               repairBasePath:            repairBase,
+               snapshotDir:               path.Join(repairBase, "snapshot"),
+               statePath:                 path.Join(repairBase, "state.json"),
+               composeSlotAppendFilePath: path.Join(repairBase, 
"state-append-%d.tmp"),
+               composeTreeFilePathFmt:    path.Join(repairBase, 
"state-tree-%s.data"),
+               treeSlotCount:             treeSlotCount,
+               batchSearchSize:           batchSearchSize,
+               takeSnapshot:              takeSnapshot,
+               repairQuickBuildTreeTime:  repairQuickBuildTreeTime,
+               quickRepairNotified:       &quickRepairNotified,
+               metrics:                   newRepairMetrics(metricsFactory),
+               closer:                    run.NewCloser(1),
+       }
+       if err = r.initScheduler(repairBuildTreeCron); err != nil {
+               return nil, fmt.Errorf("init scheduler: %w", err)
+       }
+       return r, nil
+}
+
+func (r *repair) initScheduler(exp string) error {
+       r.buildTreeClock = clock.New()
+       c := timestamp.NewScheduler(r.l, r.buildTreeClock)
+       r.repairTreeScheduler = c
+       err := c.Register("repair", cron.Descriptor, exp, func(t time.Time, _ 
*logger.Logger) bool {
+               r.doRepairScheduler(t, true)
+               return true
+       })
+       if err != nil {
+               return fmt.Errorf("failed to add repair build tree cron task: 
%w", err)
+       }
+       interval, nextTime, exist := c.Interval("repair")
+       if !exist {
+               return fmt.Errorf("failed to get repair build tree cron task 
interval")
+       }
+       r.buildTreeScheduleInterval = interval
+       r.latestBuildTreeSchedule = nextTime.Add(-interval)
+       return nil
+}
+
+func (r *repair) documentUpdatesNotify() {
+       if !atomic.CompareAndSwapInt32(r.quickRepairNotified, 0, 1) {
+               return
+       }
+
+       go func() {
+               select {
+               case <-r.closer.CloseNotify():
+                       return
+               case <-time.After(r.repairQuickBuildTreeTime):
+                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       // reset the notified flag to allow the next 
notification
+                       atomic.StoreInt32(r.quickRepairNotified, 0)
+               }
+       }()
+}
+
+func (r *repair) doRepairScheduler(t time.Time, triggerByCron bool) {
+       if !triggerByCron {
+               // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
+               if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       return
+               }
+       } else {
+               r.latestBuildTreeSchedule = t
+       }
+
+       // if already building the tree, skip this run
+       if !r.buildTreeLocker.TryLock() {
+               return
+       }
+       defer r.buildTreeLocker.Unlock()
+       err := r.buildStatus()
+       if err != nil {
+               r.l.Err(fmt.Errorf("repair build status failure: %w", err))
+       }
+}
+
+func (r *repair) buildStatus() (err error) {
+       startTime := time.Now()
+       defer func() {
+               r.metrics.totalBuildTreeFinished.Inc(1)
+               if err != nil {
+                       r.metrics.totalBuildTreeFailures.Inc(1)
+               }
+               
r.metrics.totalBuildTreeDuration.Inc(time.Since(startTime).Seconds())
+       }()
+       var state *repairStatus
+       // reading the state file to check they have any updates
+       state, err = r.readState()
+       if err != nil {
+               return fmt.Errorf("reading state failure: %w", err)
+       }
+       indexConfig := index.DefaultConfig(r.shardPath)
+       items, err := indexConfig.DirectoryFunc().List(index.ItemKindSegment)

Review Comment:
   ```suggestion
        items, err := indexConfig.DirectoryFunc().List(index.ItemKindSnapshot)
   ```
   
   There is only 1 *.snp file there.



##########
banyand/property/repair.go:
##########
@@ -0,0 +1,853 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "bufio"
+       "crypto/sha512"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "hash"
+       "io"
+       "os"
+       "path"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/benbjohnson/clock"
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/index"
+       "github.com/cespare/xxhash/v2"
+       "github.com/pkg/errors"
+       "github.com/robfig/cron/v3"
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const repairBatchSearchSize = 100
+
+type repair struct {
+       latestBuildTreeSchedule   time.Time
+       buildTreeClock            clock.Clock
+       closer                    *run.Closer
+       repairTreeScheduler       *timestamp.Scheduler
+       metrics                   *repairMetrics
+       l                         *logger.Logger
+       quickRepairNotified       *int32
+       takeSnapshot              func(string) error
+       shardPath                 string
+       repairBasePath            string
+       snapshotDir               string
+       statePath                 string
+       composeSlotAppendFilePath string
+       composeTreeFilePathFmt    string
+       treeSlotCount             int
+       batchSearchSize           int
+       buildTreeScheduleInterval time.Duration
+       repairQuickBuildTreeTime  time.Duration
+       buildTreeLocker           sync.Mutex
+}
+
+func newRepair(
+       shardPath string,
+       l *logger.Logger,
+       metricsFactory *observability.Factory,
+       batchSearchSize,
+       treeSlotCount int,
+       repairBuildTreeCron string,
+       repairQuickBuildTreeTime time.Duration,
+       takeSnapshot func(string) error,
+) (r *repair, err error) {
+       repairBase := path.Join(shardPath, "repair")
+       var quickRepairNotified int32
+       r = &repair{
+               shardPath:                 shardPath,
+               l:                         l,
+               repairBasePath:            repairBase,
+               snapshotDir:               path.Join(repairBase, "snapshot"),
+               statePath:                 path.Join(repairBase, "state.json"),
+               composeSlotAppendFilePath: path.Join(repairBase, 
"state-append-%d.tmp"),
+               composeTreeFilePathFmt:    path.Join(repairBase, 
"state-tree-%s.data"),
+               treeSlotCount:             treeSlotCount,
+               batchSearchSize:           batchSearchSize,
+               takeSnapshot:              takeSnapshot,
+               repairQuickBuildTreeTime:  repairQuickBuildTreeTime,
+               quickRepairNotified:       &quickRepairNotified,
+               metrics:                   newRepairMetrics(metricsFactory),
+               closer:                    run.NewCloser(1),
+       }
+       if err = r.initScheduler(repairBuildTreeCron); err != nil {
+               return nil, fmt.Errorf("init scheduler: %w", err)
+       }
+       return r, nil
+}
+
+func (r *repair) initScheduler(exp string) error {
+       r.buildTreeClock = clock.New()
+       c := timestamp.NewScheduler(r.l, r.buildTreeClock)
+       r.repairTreeScheduler = c
+       err := c.Register("repair", cron.Descriptor, exp, func(t time.Time, _ 
*logger.Logger) bool {
+               r.doRepairScheduler(t, true)
+               return true
+       })
+       if err != nil {
+               return fmt.Errorf("failed to add repair build tree cron task: 
%w", err)
+       }
+       interval, nextTime, exist := c.Interval("repair")
+       if !exist {
+               return fmt.Errorf("failed to get repair build tree cron task 
interval")
+       }
+       r.buildTreeScheduleInterval = interval
+       r.latestBuildTreeSchedule = nextTime.Add(-interval)
+       return nil
+}
+
+func (r *repair) documentUpdatesNotify() {
+       if !atomic.CompareAndSwapInt32(r.quickRepairNotified, 0, 1) {
+               return
+       }
+
+       go func() {
+               select {
+               case <-r.closer.CloseNotify():
+                       return
+               case <-time.After(r.repairQuickBuildTreeTime):
+                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       // reset the notified flag to allow the next 
notification
+                       atomic.StoreInt32(r.quickRepairNotified, 0)
+               }
+       }()
+}
+
+func (r *repair) doRepairScheduler(t time.Time, triggerByCron bool) {
+       if !triggerByCron {
+               // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
+               if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       return
+               }
+       } else {
+               r.latestBuildTreeSchedule = t
+       }
+
+       // if already building the tree, skip this run
+       if !r.buildTreeLocker.TryLock() {
+               return
+       }
+       defer r.buildTreeLocker.Unlock()
+       err := r.buildStatus()
+       if err != nil {
+               r.l.Err(fmt.Errorf("repair build status failure: %w", err))
+       }
+}
+
+func (r *repair) buildStatus() (err error) {
+       startTime := time.Now()
+       defer func() {
+               r.metrics.totalBuildTreeFinished.Inc(1)
+               if err != nil {
+                       r.metrics.totalBuildTreeFailures.Inc(1)
+               }
+               
r.metrics.totalBuildTreeDuration.Inc(time.Since(startTime).Seconds())
+       }()
+       var state *repairStatus
+       // reading the state file to check they have any updates
+       state, err = r.readState()
+       if err != nil {
+               return fmt.Errorf("reading state failure: %w", err)
+       }
+       indexConfig := index.DefaultConfig(r.shardPath)
+       items, err := indexConfig.DirectoryFunc().List(index.ItemKindSegment)
+       if err != nil {
+               return fmt.Errorf("reading item kind segment failure: %w", err)
+       }
+       sort.Sort(snapshotIDList(items))
+       // check the snapshot ID have any updated
+       // if no updates, the building Trees should be skipped
+       if state != nil && len(items) != 0 && items[len(items)-1] == 
state.LastSnpID {
+               return nil
+       }
+       if len(items) == 0 {
+               return nil
+       }
+
+       // otherwise, we need to building the trees
+       // take a new snapshot first
+       err = r.takeSnapshot(r.snapshotDir)
+       if err != nil {
+               return fmt.Errorf("taking snapshot failure: %w", err)
+       }
+       blugeConf := bluge.DefaultConfig(r.snapshotDir)
+       err = r.buildTree(blugeConf)
+       if err != nil {
+               return fmt.Errorf("building trees failure: %w", err)
+       }
+
+       var latestSnapshotID uint64
+       if len(items) > 0 {
+               latestSnapshotID = items[len(items)-1]
+       }
+       // save the Trees to the state
+       state = &repairStatus{
+               LastSnpID:    latestSnapshotID,
+               LastSyncTime: time.Now(),
+       }
+       stateVal, err := json.Marshal(state)
+       if err != nil {
+               return fmt.Errorf("marshall state failure: %w", err)
+       }
+       err = os.WriteFile(r.statePath, stateVal, storage.FilePerm)
+       if err != nil {
+               return fmt.Errorf("writing state file failure: %w", err)
+       }
+       return nil
+}
+
+func (r *repair) buildTree(conf bluge.Config) error {
+       reader, err := bluge.OpenReader(conf)
+       if err != nil {
+               return fmt.Errorf("opening index reader failure: %w", err)
+       }
+       defer func() {
+               _ = reader.Close()
+       }()
+       topNSearch := bluge.NewTopNSearch(r.batchSearchSize, 
bluge.NewMatchAllQuery())
+       topNSearch.SortBy([]string{
+               fmt.Sprintf("+%s", groupField),
+               fmt.Sprintf("+%s", nameField),
+               fmt.Sprintf("+%s", entityID),
+               fmt.Sprintf("+%s", timestampField),
+       })
+
+       var latestProperty *searchingProperty
+       treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath, 
r.composeTreeFilePathFmt, r.treeSlotCount, r.l)
+       if err != nil {
+               return fmt.Errorf("creating repair tree composer failure: %w", 
err)
+       }
+       err = r.pageSearch(reader, topNSearch, func(source []byte, shaValue 
string, deleteTime int64) error {
+               // building the entity
+               if len(source) == 0 {
+                       return nil
+               }
+               var property propertyv1.Property
+               err = protojson.Unmarshal(source, &property)
+               if err != nil {
+                       return err
+               }
+               entity := GetEntity(&property)
+               if shaValue == "" {
+                       shaValue, err = r.buildShaValue(source, deleteTime)
+                       if err != nil {
+                               return fmt.Errorf("building sha value failure: 
%w", err)
+                       }
+               }
+
+               s := newSearchingProperty(&property, shaValue, entity)
+               if latestProperty != nil {

Review Comment:
   If there is only one property, you might lose it.



##########
banyand/property/repair.go:
##########
@@ -0,0 +1,853 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "bufio"
+       "crypto/sha512"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "hash"
+       "io"
+       "os"
+       "path"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/benbjohnson/clock"
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/index"
+       "github.com/cespare/xxhash/v2"
+       "github.com/pkg/errors"
+       "github.com/robfig/cron/v3"
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const repairBatchSearchSize = 100
+
+type repair struct {
+       latestBuildTreeSchedule   time.Time
+       buildTreeClock            clock.Clock
+       closer                    *run.Closer
+       repairTreeScheduler       *timestamp.Scheduler
+       metrics                   *repairMetrics
+       l                         *logger.Logger
+       quickRepairNotified       *int32
+       takeSnapshot              func(string) error
+       shardPath                 string
+       repairBasePath            string
+       snapshotDir               string
+       statePath                 string
+       composeSlotAppendFilePath string
+       composeTreeFilePathFmt    string
+       treeSlotCount             int
+       batchSearchSize           int
+       buildTreeScheduleInterval time.Duration
+       repairQuickBuildTreeTime  time.Duration
+       buildTreeLocker           sync.Mutex
+}
+
+func newRepair(
+       shardPath string,
+       l *logger.Logger,
+       metricsFactory *observability.Factory,
+       batchSearchSize,
+       treeSlotCount int,
+       repairBuildTreeCron string,
+       repairQuickBuildTreeTime time.Duration,
+       takeSnapshot func(string) error,
+) (r *repair, err error) {
+       repairBase := path.Join(shardPath, "repair")
+       var quickRepairNotified int32
+       r = &repair{
+               shardPath:                 shardPath,
+               l:                         l,
+               repairBasePath:            repairBase,
+               snapshotDir:               path.Join(repairBase, "snapshot"),
+               statePath:                 path.Join(repairBase, "state.json"),
+               composeSlotAppendFilePath: path.Join(repairBase, 
"state-append-%d.tmp"),
+               composeTreeFilePathFmt:    path.Join(repairBase, 
"state-tree-%s.data"),
+               treeSlotCount:             treeSlotCount,
+               batchSearchSize:           batchSearchSize,
+               takeSnapshot:              takeSnapshot,
+               repairQuickBuildTreeTime:  repairQuickBuildTreeTime,
+               quickRepairNotified:       &quickRepairNotified,
+               metrics:                   newRepairMetrics(metricsFactory),
+               closer:                    run.NewCloser(1),
+       }
+       if err = r.initScheduler(repairBuildTreeCron); err != nil {
+               return nil, fmt.Errorf("init scheduler: %w", err)
+       }
+       return r, nil
+}
+
+func (r *repair) initScheduler(exp string) error {
+       r.buildTreeClock = clock.New()
+       c := timestamp.NewScheduler(r.l, r.buildTreeClock)
+       r.repairTreeScheduler = c
+       err := c.Register("repair", cron.Descriptor, exp, func(t time.Time, _ 
*logger.Logger) bool {
+               r.doRepairScheduler(t, true)
+               return true
+       })
+       if err != nil {
+               return fmt.Errorf("failed to add repair build tree cron task: 
%w", err)
+       }
+       interval, nextTime, exist := c.Interval("repair")
+       if !exist {
+               return fmt.Errorf("failed to get repair build tree cron task 
interval")
+       }
+       r.buildTreeScheduleInterval = interval
+       r.latestBuildTreeSchedule = nextTime.Add(-interval)
+       return nil
+}
+
+func (r *repair) documentUpdatesNotify() {
+       if !atomic.CompareAndSwapInt32(r.quickRepairNotified, 0, 1) {
+               return
+       }
+
+       go func() {
+               select {
+               case <-r.closer.CloseNotify():
+                       return
+               case <-time.After(r.repairQuickBuildTreeTime):
+                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       // reset the notified flag to allow the next 
notification
+                       atomic.StoreInt32(r.quickRepairNotified, 0)
+               }
+       }()
+}
+
+func (r *repair) doRepairScheduler(t time.Time, triggerByCron bool) {
+       if !triggerByCron {
+               // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
+               if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       return
+               }
+       } else {
+               r.latestBuildTreeSchedule = t
+       }
+
+       // if already building the tree, skip this run
+       if !r.buildTreeLocker.TryLock() {
+               return
+       }
+       defer r.buildTreeLocker.Unlock()
+       err := r.buildStatus()
+       if err != nil {
+               r.l.Err(fmt.Errorf("repair build status failure: %w", err))
+       }
+}
+
+func (r *repair) buildStatus() (err error) {
+       startTime := time.Now()
+       defer func() {
+               r.metrics.totalBuildTreeFinished.Inc(1)
+               if err != nil {
+                       r.metrics.totalBuildTreeFailures.Inc(1)
+               }
+               
r.metrics.totalBuildTreeDuration.Inc(time.Since(startTime).Seconds())
+       }()
+       var state *repairStatus
+       // reading the state file to check they have any updates
+       state, err = r.readState()
+       if err != nil {
+               return fmt.Errorf("reading state failure: %w", err)
+       }
+       indexConfig := index.DefaultConfig(r.shardPath)
+       items, err := indexConfig.DirectoryFunc().List(index.ItemKindSegment)
+       if err != nil {
+               return fmt.Errorf("reading item kind segment failure: %w", err)
+       }
+       sort.Sort(snapshotIDList(items))
+       // check the snapshot ID have any updated
+       // if no updates, the building Trees should be skipped
+       if state != nil && len(items) != 0 && items[len(items)-1] == 
state.LastSnpID {
+               return nil
+       }
+       if len(items) == 0 {
+               return nil
+       }
+
+       // otherwise, we need to building the trees
+       // take a new snapshot first
+       err = r.takeSnapshot(r.snapshotDir)

Review Comment:
   ```suggestion
        snapshotDir,err = r.takeSnapshot()
   ```
   
   The snapshot directory has a dynamic path that includes a creation timestamp.



##########
banyand/property/repair.go:
##########
@@ -0,0 +1,853 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "bufio"
+       "crypto/sha512"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "hash"
+       "io"
+       "os"
+       "path"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/benbjohnson/clock"
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/index"
+       "github.com/cespare/xxhash/v2"
+       "github.com/pkg/errors"
+       "github.com/robfig/cron/v3"
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const repairBatchSearchSize = 100
+
+type repair struct {
+       latestBuildTreeSchedule   time.Time
+       buildTreeClock            clock.Clock
+       closer                    *run.Closer
+       repairTreeScheduler       *timestamp.Scheduler
+       metrics                   *repairMetrics
+       l                         *logger.Logger
+       quickRepairNotified       *int32
+       takeSnapshot              func(string) error
+       shardPath                 string
+       repairBasePath            string
+       snapshotDir               string
+       statePath                 string
+       composeSlotAppendFilePath string
+       composeTreeFilePathFmt    string
+       treeSlotCount             int
+       batchSearchSize           int
+       buildTreeScheduleInterval time.Duration
+       repairQuickBuildTreeTime  time.Duration
+       buildTreeLocker           sync.Mutex
+}
+
+func newRepair(
+       shardPath string,
+       l *logger.Logger,
+       metricsFactory *observability.Factory,
+       batchSearchSize,
+       treeSlotCount int,
+       repairBuildTreeCron string,
+       repairQuickBuildTreeTime time.Duration,
+       takeSnapshot func(string) error,
+) (r *repair, err error) {
+       repairBase := path.Join(shardPath, "repair")
+       var quickRepairNotified int32
+       r = &repair{
+               shardPath:                 shardPath,
+               l:                         l,
+               repairBasePath:            repairBase,
+               snapshotDir:               path.Join(repairBase, "snapshot"),
+               statePath:                 path.Join(repairBase, "state.json"),
+               composeSlotAppendFilePath: path.Join(repairBase, 
"state-append-%d.tmp"),
+               composeTreeFilePathFmt:    path.Join(repairBase, 
"state-tree-%s.data"),
+               treeSlotCount:             treeSlotCount,
+               batchSearchSize:           batchSearchSize,
+               takeSnapshot:              takeSnapshot,
+               repairQuickBuildTreeTime:  repairQuickBuildTreeTime,
+               quickRepairNotified:       &quickRepairNotified,
+               metrics:                   newRepairMetrics(metricsFactory),
+               closer:                    run.NewCloser(1),
+       }
+       if err = r.initScheduler(repairBuildTreeCron); err != nil {
+               return nil, fmt.Errorf("init scheduler: %w", err)
+       }
+       return r, nil
+}
+
+func (r *repair) initScheduler(exp string) error {
+       r.buildTreeClock = clock.New()
+       c := timestamp.NewScheduler(r.l, r.buildTreeClock)
+       r.repairTreeScheduler = c
+       err := c.Register("repair", cron.Descriptor, exp, func(t time.Time, _ 
*logger.Logger) bool {
+               r.doRepairScheduler(t, true)
+               return true
+       })
+       if err != nil {
+               return fmt.Errorf("failed to add repair build tree cron task: 
%w", err)
+       }
+       interval, nextTime, exist := c.Interval("repair")
+       if !exist {
+               return fmt.Errorf("failed to get repair build tree cron task 
interval")
+       }
+       r.buildTreeScheduleInterval = interval
+       r.latestBuildTreeSchedule = nextTime.Add(-interval)
+       return nil
+}
+
+func (r *repair) documentUpdatesNotify() {
+       if !atomic.CompareAndSwapInt32(r.quickRepairNotified, 0, 1) {
+               return
+       }
+
+       go func() {
+               select {
+               case <-r.closer.CloseNotify():
+                       return
+               case <-time.After(r.repairQuickBuildTreeTime):
+                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       // reset the notified flag to allow the next 
notification
+                       atomic.StoreInt32(r.quickRepairNotified, 0)
+               }
+       }()
+}
+
+func (r *repair) doRepairScheduler(t time.Time, triggerByCron bool) {
+       if !triggerByCron {
+               // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
+               if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       return
+               }
+       } else {
+               r.latestBuildTreeSchedule = t
+       }
+
+       // if already building the tree, skip this run
+       if !r.buildTreeLocker.TryLock() {
+               return
+       }
+       defer r.buildTreeLocker.Unlock()
+       err := r.buildStatus()
+       if err != nil {
+               r.l.Err(fmt.Errorf("repair build status failure: %w", err))
+       }
+}
+
+func (r *repair) buildStatus() (err error) {
+       startTime := time.Now()
+       defer func() {
+               r.metrics.totalBuildTreeFinished.Inc(1)
+               if err != nil {
+                       r.metrics.totalBuildTreeFailures.Inc(1)
+               }
+               
r.metrics.totalBuildTreeDuration.Inc(time.Since(startTime).Seconds())
+       }()
+       var state *repairStatus
+       // reading the state file to check they have any updates
+       state, err = r.readState()
+       if err != nil {
+               return fmt.Errorf("reading state failure: %w", err)
+       }
+       indexConfig := index.DefaultConfig(r.shardPath)
+       items, err := indexConfig.DirectoryFunc().List(index.ItemKindSegment)
+       if err != nil {
+               return fmt.Errorf("reading item kind segment failure: %w", err)
+       }
+       sort.Sort(snapshotIDList(items))
+       // check the snapshot ID have any updated
+       // if no updates, the building Trees should be skipped
+       if state != nil && len(items) != 0 && items[len(items)-1] == 
state.LastSnpID {
+               return nil
+       }
+       if len(items) == 0 {
+               return nil
+       }
+
+       // otherwise, we need to building the trees
+       // take a new snapshot first
+       err = r.takeSnapshot(r.snapshotDir)
+       if err != nil {
+               return fmt.Errorf("taking snapshot failure: %w", err)
+       }
+       blugeConf := bluge.DefaultConfig(r.snapshotDir)
+       err = r.buildTree(blugeConf)
+       if err != nil {
+               return fmt.Errorf("building trees failure: %w", err)
+       }
+
+       var latestSnapshotID uint64
+       if len(items) > 0 {
+               latestSnapshotID = items[len(items)-1]
+       }
+       // save the Trees to the state
+       state = &repairStatus{
+               LastSnpID:    latestSnapshotID,
+               LastSyncTime: time.Now(),
+       }
+       stateVal, err := json.Marshal(state)
+       if err != nil {
+               return fmt.Errorf("marshall state failure: %w", err)
+       }
+       err = os.WriteFile(r.statePath, stateVal, storage.FilePerm)
+       if err != nil {
+               return fmt.Errorf("writing state file failure: %w", err)
+       }
+       return nil
+}
+
+func (r *repair) buildTree(conf bluge.Config) error {
+       reader, err := bluge.OpenReader(conf)
+       if err != nil {
+               return fmt.Errorf("opening index reader failure: %w", err)
+       }
+       defer func() {
+               _ = reader.Close()
+       }()
+       topNSearch := bluge.NewTopNSearch(r.batchSearchSize, 
bluge.NewMatchAllQuery())
+       topNSearch.SortBy([]string{
+               fmt.Sprintf("+%s", groupField),
+               fmt.Sprintf("+%s", nameField),
+               fmt.Sprintf("+%s", entityID),
+               fmt.Sprintf("+%s", timestampField),
+       })
+
+       var latestProperty *searchingProperty
+       treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath, 
r.composeTreeFilePathFmt, r.treeSlotCount, r.l)
+       if err != nil {
+               return fmt.Errorf("creating repair tree composer failure: %w", 
err)
+       }
+       err = r.pageSearch(reader, topNSearch, func(source []byte, shaValue 
string, deleteTime int64) error {

Review Comment:
   ```suggestion
        err = r.pageSearch(reader, topNSearch, func(shaValue string, deleteTime 
int64) error {
   ```
   
   You can get `group`, `name` and `entityID` from SortValue



##########
banyand/property/repair.go:
##########
@@ -0,0 +1,853 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "bufio"
+       "crypto/sha512"
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "hash"
+       "io"
+       "os"
+       "path"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/benbjohnson/clock"
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/index"
+       "github.com/cespare/xxhash/v2"
+       "github.com/pkg/errors"
+       "github.com/robfig/cron/v3"
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const repairBatchSearchSize = 100
+
+type repair struct {
+       latestBuildTreeSchedule   time.Time
+       buildTreeClock            clock.Clock
+       closer                    *run.Closer
+       repairTreeScheduler       *timestamp.Scheduler
+       metrics                   *repairMetrics
+       l                         *logger.Logger
+       quickRepairNotified       *int32
+       takeSnapshot              func(string) error
+       shardPath                 string
+       repairBasePath            string
+       snapshotDir               string
+       statePath                 string
+       composeSlotAppendFilePath string
+       composeTreeFilePathFmt    string
+       treeSlotCount             int
+       batchSearchSize           int
+       buildTreeScheduleInterval time.Duration
+       repairQuickBuildTreeTime  time.Duration
+       buildTreeLocker           sync.Mutex
+}
+
+func newRepair(
+       shardPath string,
+       l *logger.Logger,
+       metricsFactory *observability.Factory,
+       batchSearchSize,
+       treeSlotCount int,
+       repairBuildTreeCron string,
+       repairQuickBuildTreeTime time.Duration,
+       takeSnapshot func(string) error,
+) (r *repair, err error) {
+       repairBase := path.Join(shardPath, "repair")
+       var quickRepairNotified int32
+       r = &repair{
+               shardPath:                 shardPath,
+               l:                         l,
+               repairBasePath:            repairBase,
+               snapshotDir:               path.Join(repairBase, "snapshot"),
+               statePath:                 path.Join(repairBase, "state.json"),
+               composeSlotAppendFilePath: path.Join(repairBase, 
"state-append-%d.tmp"),
+               composeTreeFilePathFmt:    path.Join(repairBase, 
"state-tree-%s.data"),
+               treeSlotCount:             treeSlotCount,
+               batchSearchSize:           batchSearchSize,
+               takeSnapshot:              takeSnapshot,
+               repairQuickBuildTreeTime:  repairQuickBuildTreeTime,
+               quickRepairNotified:       &quickRepairNotified,
+               metrics:                   newRepairMetrics(metricsFactory),
+               closer:                    run.NewCloser(1),
+       }
+       if err = r.initScheduler(repairBuildTreeCron); err != nil {
+               return nil, fmt.Errorf("init scheduler: %w", err)
+       }
+       return r, nil
+}
+
+func (r *repair) initScheduler(exp string) error {
+       r.buildTreeClock = clock.New()
+       c := timestamp.NewScheduler(r.l, r.buildTreeClock)
+       r.repairTreeScheduler = c
+       err := c.Register("repair", cron.Descriptor, exp, func(t time.Time, _ 
*logger.Logger) bool {
+               r.doRepairScheduler(t, true)
+               return true
+       })
+       if err != nil {
+               return fmt.Errorf("failed to add repair build tree cron task: 
%w", err)
+       }
+       interval, nextTime, exist := c.Interval("repair")
+       if !exist {
+               return fmt.Errorf("failed to get repair build tree cron task 
interval")
+       }
+       r.buildTreeScheduleInterval = interval
+       r.latestBuildTreeSchedule = nextTime.Add(-interval)
+       return nil
+}
+
+func (r *repair) documentUpdatesNotify() {
+       if !atomic.CompareAndSwapInt32(r.quickRepairNotified, 0, 1) {
+               return
+       }
+
+       go func() {
+               select {
+               case <-r.closer.CloseNotify():
+                       return
+               case <-time.After(r.repairQuickBuildTreeTime):
+                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       // reset the notified flag to allow the next 
notification
+                       atomic.StoreInt32(r.quickRepairNotified, 0)
+               }
+       }()
+}
+
+func (r *repair) doRepairScheduler(t time.Time, triggerByCron bool) {
+       if !triggerByCron {
+               // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
+               if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
+                       return
+               }
+       } else {
+               r.latestBuildTreeSchedule = t
+       }
+
+       // if already building the tree, skip this run
+       if !r.buildTreeLocker.TryLock() {
+               return
+       }
+       defer r.buildTreeLocker.Unlock()
+       err := r.buildStatus()
+       if err != nil {
+               r.l.Err(fmt.Errorf("repair build status failure: %w", err))
+       }
+}
+
+func (r *repair) buildStatus() (err error) {
+       startTime := time.Now()
+       defer func() {
+               r.metrics.totalBuildTreeFinished.Inc(1)
+               if err != nil {
+                       r.metrics.totalBuildTreeFailures.Inc(1)
+               }
+               
r.metrics.totalBuildTreeDuration.Inc(time.Since(startTime).Seconds())
+       }()
+       var state *repairStatus
+       // reading the state file to check they have any updates
+       state, err = r.readState()
+       if err != nil {
+               return fmt.Errorf("reading state failure: %w", err)
+       }
+       indexConfig := index.DefaultConfig(r.shardPath)
+       items, err := indexConfig.DirectoryFunc().List(index.ItemKindSegment)
+       if err != nil {
+               return fmt.Errorf("reading item kind segment failure: %w", err)
+       }
+       sort.Sort(snapshotIDList(items))
+       // check the snapshot ID have any updated
+       // if no updates, the building Trees should be skipped
+       if state != nil && len(items) != 0 && items[len(items)-1] == 
state.LastSnpID {
+               return nil
+       }
+       if len(items) == 0 {
+               return nil
+       }
+
+       // otherwise, we need to building the trees
+       // take a new snapshot first
+       err = r.takeSnapshot(r.snapshotDir)
+       if err != nil {
+               return fmt.Errorf("taking snapshot failure: %w", err)
+       }
+       blugeConf := bluge.DefaultConfig(r.snapshotDir)
+       err = r.buildTree(blugeConf)
+       if err != nil {
+               return fmt.Errorf("building trees failure: %w", err)
+       }
+
+       var latestSnapshotID uint64
+       if len(items) > 0 {
+               latestSnapshotID = items[len(items)-1]
+       }
+       // save the Trees to the state
+       state = &repairStatus{
+               LastSnpID:    latestSnapshotID,
+               LastSyncTime: time.Now(),
+       }
+       stateVal, err := json.Marshal(state)
+       if err != nil {
+               return fmt.Errorf("marshall state failure: %w", err)
+       }
+       err = os.WriteFile(r.statePath, stateVal, storage.FilePerm)
+       if err != nil {
+               return fmt.Errorf("writing state file failure: %w", err)
+       }
+       return nil
+}
+
+func (r *repair) buildTree(conf bluge.Config) error {
+       reader, err := bluge.OpenReader(conf)
+       if err != nil {
+               return fmt.Errorf("opening index reader failure: %w", err)
+       }
+       defer func() {
+               _ = reader.Close()
+       }()
+       topNSearch := bluge.NewTopNSearch(r.batchSearchSize, 
bluge.NewMatchAllQuery())
+       topNSearch.SortBy([]string{
+               fmt.Sprintf("+%s", groupField),
+               fmt.Sprintf("+%s", nameField),
+               fmt.Sprintf("+%s", entityID),
+               fmt.Sprintf("+%s", timestampField),
+       })
+
+       var latestProperty *searchingProperty
+       treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath, 
r.composeTreeFilePathFmt, r.treeSlotCount, r.l)
+       if err != nil {
+               return fmt.Errorf("creating repair tree composer failure: %w", 
err)
+       }
+       err = r.pageSearch(reader, topNSearch, func(source []byte, shaValue 
string, deleteTime int64) error {
+               // building the entity
+               if len(source) == 0 {
+                       return nil
+               }
+               var property propertyv1.Property
+               err = protojson.Unmarshal(source, &property)
+               if err != nil {
+                       return err
+               }
+               entity := GetEntity(&property)
+               if shaValue == "" {
+                       shaValue, err = r.buildShaValue(source, deleteTime)
+                       if err != nil {
+                               return fmt.Errorf("building sha value failure: 
%w", err)
+                       }
+               }
+
+               s := newSearchingProperty(&property, shaValue, entity)
+               if latestProperty != nil {
+                       if latestProperty.group != property.Metadata.Group {
+                               // if the group have changed, we need to append 
the latest property to the tree composer, and compose builder
+                               // the entity is changed, need to save the 
property
+                               if err = 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err != 
nil {
+                                       return fmt.Errorf("appending property 
to tree composer failure: %w", err)
+                               }
+                               err = 
treeComposer.composeNextGroupAndSave(latestProperty.group)
+                               if err != nil {
+                                       return fmt.Errorf("composing group 
failure: %w", err)
+                               }
+                       } else if latestProperty.entityID != entity {
+                               // the entity is changed, need to save the 
property
+                               if err = 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err != 
nil {
+                                       return fmt.Errorf("appending property 
to tree composer failure: %w", err)
+                               }
+                       }
+               }
+               latestProperty = s
+               return nil
+       })
+       if err != nil {
+               return err
+       }
+       // if the latestProperty is not nil, it means the latest property need 
to be saved
+       if latestProperty != nil {
+               if err = treeComposer.append(latestProperty.entityID, 
latestProperty.shaValue); err != nil {
+                       return fmt.Errorf("appending latest property to tree 
composer failure: %w", err)
+               }
+               err = treeComposer.composeNextGroupAndSave(latestProperty.group)
+               if err != nil {
+                       return fmt.Errorf("composing last group failure: %w", 
err)
+               }
+       }
+
+       return nil
+}
+
+//nolint:contextcheck
+func (r *repair) pageSearch(reader *bluge.Reader, searcher *bluge.TopNSearch, 
each func(source []byte, shaValue string, deleteTime int64) error) error {
+       var latestDocValues [][]byte
+       for {
+               searcher.After(latestDocValues)
+               result, err := reader.Search(r.closer.Ctx(), searcher)
+               if err != nil {
+                       return fmt.Errorf("searching index failure: %w", err)
+               }
+
+               next, err := result.Next()
+               var hitNumber int
+               if err != nil {
+                       return errors.WithMessage(err, "iterate document match 
iterator")
+               }
+               // if next is nil, it means no more documents to process
+               if next == nil {
+                       return nil
+               }
+               var shaValue string
+               var source []byte
+               var deleteTime int64
+               for err == nil && next != nil {
+                       hitNumber = next.HitNumber
+                       var errTime error
+                       err = next.VisitStoredFields(func(field string, value 
[]byte) bool {
+                               switch field {
+                               case shaValueField:
+                                       shaValue = convert.BytesToString(value)
+                               case sourceField:
+                                       source = value
+                               case deleteField:
+                                       deleteTime = convert.BytesToInt64(value)
+                               }
+                               return true
+                       })
+                       if err = multierr.Combine(err, errTime); err != nil {
+                               return errors.WithMessagef(err, "visit stored 
fields, hit: %d", hitNumber)
+                       }
+                       err = each(source, shaValue, deleteTime)
+                       if err != nil {
+                               return errors.WithMessagef(err, "processing 
source failure, hit: %d", hitNumber)
+                       }
+                       latestDocValues = next.SortValue
+                       next, err = result.Next()
+               }
+       }
+}
+
+func (r *repair) buildShaValue(source []byte, deleteTime int64) (string, 
error) {
+       var err error
+       hash := sha512.New()
+       _, err = hash.Write(source)
+       if err != nil {
+               return "", fmt.Errorf("hashing source failure: %w", err)
+       }
+       _, err = hash.Write([]byte(fmt.Sprintf("%d", deleteTime)))
+       if err != nil {
+               return "", fmt.Errorf("hashing delete time failure: %w", err)
+       }
+       return fmt.Sprintf("%x", hash.Sum(nil)), nil
+}
+
+func (r *repair) readState() (*repairStatus, error) {
+       stateFile, err := os.ReadFile(r.statePath)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       var status repairStatus
+       err = json.Unmarshal(stateFile, &status)
+       if err != nil {
+               return nil, err
+       }
+       return &status, nil
+}
+
+func (r *repair) close() {
+       r.closer.Done()
+       r.closer.CloseThenWait()
+       r.repairTreeScheduler.Close()
+}
+
+type repairStatus struct {
+       LastSyncTime time.Time `json:"last_sync_time"`
+       LastSnpID    uint64    `json:"last_snp_id"`
+}
+
+type repairTreeNodeType int
+
+const (
+       repairTreeNodeTypeRoot repairTreeNodeType = iota
+       repairTreeNodeTypeSlot
+       repairTreeNodeTypeLeaf
+)
+
+type repairTreeNode struct {
+       shaValue  string
+       id        string
+       tp        repairTreeNodeType
+       leafStart int64
+       leafLen   int64
+}
+type repairTreeReader struct {
+       file   *os.File
+       reader *bufio.Reader
+       footer *repairTreeFooter
+}
+
+func (r *repair) treeReader(group string) (*repairTreeReader, error) {
+       groupFile := fmt.Sprintf(r.composeTreeFilePathFmt, group)
+       file, err := os.OpenFile(groupFile, os.O_RDONLY, os.ModePerm)
+       if err != nil {
+               if errors.Is(err, os.ErrNotExist) {
+                       // if the file does not exist, it means no repair tree 
for this group
+                       return nil, nil
+               }
+               return nil, fmt.Errorf("opening repair tree file %s failure: 
%w", group, err)
+       }
+       reader := &repairTreeReader{
+               file:   file,
+               reader: bufio.NewReader(file),
+       }
+       if err = reader.readFoot(); err != nil {
+               _ = file.Close()
+               return nil, fmt.Errorf("reading footer from repair tree file %s 
failure: %w", groupFile, err)
+       }
+       return reader, nil
+}
+
+func (r *repairTreeReader) readFoot() error {
+       stat, err := r.file.Stat()
+       if err != nil {
+               return fmt.Errorf("getting file stat for %s failure: %w", 
r.file.Name(), err)
+       }
+       footerOffset := stat.Size() - 1
+       if _, err = r.file.Seek(footerOffset, io.SeekStart); err != nil {
+               return fmt.Errorf("seeking to footer offset %d in file %s 
failure: %w", footerOffset, r.file.Name(), err)
+       }
+       var footLen uint8
+       if err = binary.Read(r.file, binary.LittleEndian, &footLen); err != nil 
{
+               return fmt.Errorf("reading footer length from file %s failure: 
%w", r.file.Name(), err)
+       }
+       _, err = r.file.Seek(-(int64(footLen) + 1), io.SeekCurrent)
+       if err != nil {
+               return fmt.Errorf("seeking to start of footer in file %s 
failure: %w", r.file.Name(), err)
+       }
+       footerBytes := make([]byte, footLen)
+       if _, err = io.ReadFull(r.file, footerBytes); err != nil {
+               return fmt.Errorf("reading footer from file %s failure: %w", 
r.file.Name(), err)
+       }
+       footerData := make([]int64, 4)
+       _, err = encoding.BytesToVarInt64List(footerData, footerBytes)
+       if err != nil {
+               return fmt.Errorf("decoding footer from file %s failure: %w", 
r.file.Name(), err)
+       }
+       r.footer = &repairTreeFooter{
+               leafNodeFinishedOffset: footerData[0],
+               slotNodeLen:            footerData[1],
+               slotNodeFinishedOffset: footerData[2],
+               rootNodeLen:            footerData[3],
+       }
+       return nil
+}
+
+func (r *repairTreeReader) seekPosition(offset int64, whence int) error {
+       _, err := r.file.Seek(offset, whence)
+       if err != nil {
+               return fmt.Errorf("seeking position failure: %w", err)
+       }
+       r.reader.Reset(r.file)
+       return nil
+}
+
+func (r *repairTreeReader) read(parent *repairTreeNode) ([]*repairTreeNode, 
error) {
+       if parent == nil {
+               // reading the root node
+               err := r.seekPosition(r.footer.slotNodeFinishedOffset, 
io.SeekStart)
+               if err != nil {
+                       return nil, fmt.Errorf("seeking to root node offset %d 
in file %s failure: %w", r.footer.slotNodeFinishedOffset, r.file.Name(), err)
+               }
+               rootDataBytes := make([]byte, r.footer.rootNodeLen)
+               if _, err = io.ReadFull(r.reader, rootDataBytes); err != nil {
+                       return nil, fmt.Errorf("reading root node data from 
file %s failure: %w", r.file.Name(), err)
+               }
+               _, shaValue, err := encoding.DecodeBytes(rootDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding root node sha value 
from file %s failure: %w", r.file.Name(), err)
+               }
+               return []*repairTreeNode{
+                       {
+                               shaValue: fmt.Sprintf("%x", shaValue),
+                               tp:       repairTreeNodeTypeRoot,
+                       },
+               }, nil
+       }
+
+       var err error
+       nodes := make([]*repairTreeNode, 0)
+       if parent.tp == repairTreeNodeTypeRoot {
+               // reading the slot nodes
+               if err = r.seekPosition(r.footer.leafNodeFinishedOffset, 
io.SeekStart); err != nil {
+                       return nil, fmt.Errorf("seeking to slot node offset %d 
in file %s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err)
+               }
+               slotDataBytes := make([]byte, r.footer.slotNodeLen)
+               if _, err = io.ReadFull(r.reader, slotDataBytes); err != nil {
+                       return nil, fmt.Errorf("reading slot node data from 
file %s failure: %w", r.file.Name(), err)
+               }
+               reduceBytesLen := int64(len(slotDataBytes))
+
+               var slotNodeIndex, leafStartOff, leafLen int64
+               var slotShaVal []byte
+               for reduceBytesLen > 0 {
+                       slotDataBytes, slotNodeIndex, err = 
encoding.BytesToVarInt64(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node 
index from file %s failure: %w", r.file.Name(), err)
+                       }
+                       slotDataBytes, slotShaVal, err = 
encoding.DecodeBytes(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node sha 
value from file %s failure: %w", r.file.Name(), err)
+                       }
+                       slotDataBytes, leafStartOff, err = 
encoding.BytesToVarInt64(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node leaf 
start offset from file %s failure: %w", r.file.Name(), err)
+                       }
+                       slotDataBytes, leafLen, err = 
encoding.BytesToVarInt64(slotDataBytes)
+                       if err != nil {
+                               return nil, fmt.Errorf("decoding slot node leaf 
length from file %s failure: %w", r.file.Name(), err)
+                       }
+                       reduceBytesLen = int64(len(slotDataBytes))
+                       nodes = append(nodes, &repairTreeNode{
+                               shaValue:  fmt.Sprintf("%x", slotShaVal),
+                               id:        fmt.Sprintf("%d", slotNodeIndex),
+                               tp:        repairTreeNodeTypeSlot,
+                               leafStart: leafStartOff,
+                               leafLen:   leafLen,
+                       })
+               }
+               return nodes, nil
+       } else if parent.tp == repairTreeNodeTypeLeaf {
+               return nil, nil
+       }
+
+       // otherwise, reading the leaf nodes
+       err = r.seekPosition(parent.leafStart, io.SeekStart)
+       if err != nil {
+               return nil, fmt.Errorf("seeking to leaf node offset %d in file 
%s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err)
+       }
+       leafDataBytes := make([]byte, parent.leafLen)
+       if _, err = io.ReadFull(r.reader, leafDataBytes); err != nil {
+               return nil, fmt.Errorf("reading leaf node data from file %s 
failure: %w", r.file.Name(), err)
+       }
+       reduceBytesLen := int64(len(leafDataBytes))
+       for reduceBytesLen > 0 {
+               var entity, shaVal []byte
+               leafDataBytes, entity, err = encoding.DecodeBytes(leafDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding leaf node entity from 
file %s failure: %w", r.file.Name(), err)
+               }
+               leafDataBytes, shaVal, err = encoding.DecodeBytes(leafDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding leaf node sha value 
from file %s failure: %w", r.file.Name(), err)
+               }
+               reduceBytesLen = int64(len(leafDataBytes))
+               nodes = append(nodes, &repairTreeNode{
+                       shaValue: string(shaVal),
+                       id:       string(entity),
+                       tp:       repairTreeNodeTypeLeaf,
+               })
+       }
+       return nodes, nil
+}
+
+type repairTreeFooter struct {
+       leafNodeFinishedOffset int64
+       slotNodeLen            int64
+       slotNodeFinishedOffset int64
+       rootNodeLen            int64
+}
+
+type snapshotIDList []uint64
+
+func (s snapshotIDList) Len() int           { return len(s) }
+func (s snapshotIDList) Less(i, j int) bool { return s[i] < s[j] }
+func (s snapshotIDList) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+
+type repairTreeComposer struct {
+       l             *logger.Logger
+       appendFileFmt string
+       treeFileFmt   string
+       slotFiles     []*repairSlotFile
+       slotCount     int
+}
+
+func newRepairTreeComposer(appendSlotFilePathFmt, treeFilePathFmt string, 
slotCount int, l *logger.Logger) *repairTreeComposer {
+       return &repairTreeComposer{
+               appendFileFmt: appendSlotFilePathFmt,
+               treeFileFmt:   treeFilePathFmt,
+               slotCount:     slotCount,
+               slotFiles:     make([]*repairSlotFile, slotCount),
+               l:             l,
+       }
+}
+
+func (r *repairTreeComposer) append(id, shaVal string) (err error) {
+       idBytes := []byte(id)
+       shaValBytes := []byte(shaVal)
+       val := xxhash.Sum64(idBytes)
+       slotIndex := val % uint64(r.slotCount)
+       file := r.slotFiles[slotIndex]
+       if file == nil {
+               file, err = newRepairSlotFile(int(slotIndex), r.appendFileFmt, 
r.l)
+               if err != nil {
+                       return fmt.Errorf("creating repair slot file for slot 
%d failure: %w", slotIndex, err)
+               }
+               r.slotFiles[slotIndex] = file
+       }
+       return file.append(idBytes, shaValBytes)
+}
+
+// composeNextGroupAndSave composes the current group of slot files into a 
repair tree file.
+// tree file format: [leaf nodes]+[slot nodes]+[root node]+[metadata]
+// leaf nodes: each node contains: [entity]+[sha value]
+// slot nodes: each node contains: [slot index]+[sha value]+[leaf nodes start 
offset]+[leaf nodes data length(binary encoded)]
+// root node: contains [sha value]
+// metadata: contains footer([slot nodes start offset]+[slot nodes length(data 
binary)]+[root node start offset]+[root node length])+[footer length(data 
binary)]
+func (r *repairTreeComposer) composeNextGroupAndSave(group string) (err error) 
{
+       treeFilePath := fmt.Sprintf(r.treeFileFmt, group)
+       treeBuilder, err := newRepairTreeBuilder(treeFilePath)
+       if err != nil {
+               return fmt.Errorf("creating repair tree builder for group %s 
failure: %w", group, err)
+       }
+       defer func() {
+               if closeErr := treeBuilder.close(); closeErr != nil {
+                       err = multierr.Append(err, fmt.Errorf("closing repair 
tree builder for group %s failure: %w", group, closeErr))
+               }
+       }()
+       for _, f := range r.slotFiles {
+               if f == nil {
+                       continue
+               }
+               if err = treeBuilder.appendSlot(f); err != nil {
+                       return fmt.Errorf("appending slot file %s to repair 
tree builder for group %s failure: %w", f.path, group, err)
+               }
+       }
+       // reset the slot files for the next group
+       r.slotFiles = make([]*repairSlotFile, r.slotCount)

Review Comment:
   You can set `r.slotFiles[i]=nil` to reset the slotFiles in the upper loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to