This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch compaction-algorithm
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 84bf6dc77980a8bbe76b36610646ff329884cbf7
Author: Megrez Lu <lujiajing1...@gmail.com>
AuthorDate: Sat Jan 13 22:00:23 2024 +0800

    add lowest apml policy
---
 banyand/measure/merger_algo.go | 90 ++++++++++++++++++++++++++++++++++++++----
 1 file changed, 83 insertions(+), 7 deletions(-)

diff --git a/banyand/measure/merger_algo.go b/banyand/measure/merger_algo.go
index 5749ee37..7aa60966 100644
--- a/banyand/measure/merger_algo.go
+++ b/banyand/measure/merger_algo.go
@@ -17,26 +17,102 @@
 
 package measure
 
+import (
+       "sort"
+)
+
 var (
        _ MergePolicy = (*noopMergePolicy)(nil)
        _ MergePolicy = (*lowestWriteAmplificationMergePolicy)(nil)
 )
 
+// MergePolicy is to select parts to be merged.
 type MergePolicy interface {
-       GetPartsToMerge(pws []*partWrapper, maxFanOut uint64) []*partWrapper
+       GetPartsToMerge(src []*partWrapper, maxFanOut uint64) []*partWrapper
 }
 
-// noopMergePolicy aims to merge all parts
+// noopMergePolicy aims to merge all parts.
 type noopMergePolicy struct{}
 
-func (s *noopMergePolicy) GetPartsToMerge(pws []*partWrapper, _ uint64) 
[]*partWrapper {
-       return pws
+func (s *noopMergePolicy) GetPartsToMerge(src []*partWrapper, _ uint64) 
[]*partWrapper {
+       return src
 }
 
+// lowestWriteAmplificationMergePolicy aims to choose an optimal combination
+// that has the lowest write amplification.
+// This policy can be referred to 
https://github.com/VictoriaMetrics/VictoriaMetrics/blob/v0.4.2-victorialogs/lib/logstorage/datadb.go
+// But parameters can be varied.
 type lowestWriteAmplificationMergePolicy struct {
+       maxParts           int
+       minMergeMultiplier float64
+}
+
+func (l *lowestWriteAmplificationMergePolicy) GetPartsToMerge(src 
[]*partWrapper, maxFanOut uint64) (dst []*partWrapper) {
+       sortPartsForOptimalMerge(src)
+
+       maxSrcParts := l.maxParts
+       if maxSrcParts > len(src) {
+               maxSrcParts = len(src)
+       }
+       minSrcParts := (maxSrcParts + 1) / 2
+       if minSrcParts < 2 {
+               minSrcParts = 2
+       }
+
+       // Exhaustive search for parts giving the lowest write amplification 
when merged.
+       var pws []*partWrapper
+       maxM := float64(0)
+       for i := minSrcParts; i <= maxSrcParts; i++ {
+               for j := 0; j <= len(src)-i; j++ {
+                       a := src[j : j+i]
+                       if 
a[0].p.partMetadata.CompressedSizeBytes*uint64(len(a)) < 
a[len(a)-1].p.partMetadata.CompressedSizeBytes {
+                               // Do not merge parts with too big difference 
in size,
+                               // since this results in unbalanced merges.
+                               continue
+                       }
+                       outSize := sumCompressedSize(a)
+                       if outSize > maxFanOut {
+                               // There is no need in verifying remaining 
parts with bigger sizes.
+                               break
+                       }
+                       m := float64(outSize) / 
float64(a[len(a)-1].p.partMetadata.CompressedSizeBytes)
+                       if m < maxM {
+                               continue
+                       }
+                       maxM = m
+                       pws = a
+               }
+       }
+
+       minM := float64(l.maxParts) / 2
+       if minM < l.minMergeMultiplier {
+               minM = l.minMergeMultiplier
+       }
+       if maxM < minM {
+               // There is no sense in merging parts with too small m,
+               // since this leads to high disk write IO.
+               return dst
+       }
+       return append(dst, pws...)
+}
+
+func sortPartsForOptimalMerge(pws []*partWrapper) {
+       // Sort src parts by size and backwards timestamp.
+       // This should improve adjacent points' locality in the merged parts.
+       sort.Slice(pws, func(i, j int) bool {
+               a := &pws[i].p.partMetadata
+               b := &pws[j].p.partMetadata
+               if a.CompressedSizeBytes == b.CompressedSizeBytes {
+                       return a.MinTimestamp > b.MinTimestamp
+               }
+               return a.CompressedSizeBytes < b.CompressedSizeBytes
+       })
 }
 
-func (l *lowestWriteAmplificationMergePolicy) GetPartsToMerge(pws 
[]*partWrapper, maxFanOut uint64) []*partWrapper {
-       //TODO implement me
-       panic("implement me")
+func sumCompressedSize(pws []*partWrapper) uint64 {
+       n := uint64(0)
+       for _, pw := range pws {
+               n += pw.p.partMetadata.CompressedSizeBytes
+       }
+       return n
 }

Reply via email to