This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch compaction-algorithm in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 84bf6dc77980a8bbe76b36610646ff329884cbf7 Author: Megrez Lu <lujiajing1...@gmail.com> AuthorDate: Sat Jan 13 22:00:23 2024 +0800 add lowest apml policy --- banyand/measure/merger_algo.go | 90 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 7 deletions(-) diff --git a/banyand/measure/merger_algo.go b/banyand/measure/merger_algo.go index 5749ee37..7aa60966 100644 --- a/banyand/measure/merger_algo.go +++ b/banyand/measure/merger_algo.go @@ -17,26 +17,102 @@ package measure +import ( + "sort" +) + var ( _ MergePolicy = (*noopMergePolicy)(nil) _ MergePolicy = (*lowestWriteAmplificationMergePolicy)(nil) ) +// MergePolicy is to select parts to be merged. type MergePolicy interface { - GetPartsToMerge(pws []*partWrapper, maxFanOut uint64) []*partWrapper + GetPartsToMerge(src []*partWrapper, maxFanOut uint64) []*partWrapper } -// noopMergePolicy aims to merge all parts +// noopMergePolicy aims to merge all parts. type noopMergePolicy struct{} -func (s *noopMergePolicy) GetPartsToMerge(pws []*partWrapper, _ uint64) []*partWrapper { - return pws +func (s *noopMergePolicy) GetPartsToMerge(src []*partWrapper, _ uint64) []*partWrapper { + return src } +// lowestWriteAmplificationMergePolicy aims to choose an optimal combination +// that has the lowest write amplification. +// This policy can be referred to https://github.com/VictoriaMetrics/VictoriaMetrics/blob/v0.4.2-victorialogs/lib/logstorage/datadb.go +// But parameters can be varied. type lowestWriteAmplificationMergePolicy struct { + maxParts int + minMergeMultiplier float64 +} + +func (l *lowestWriteAmplificationMergePolicy) GetPartsToMerge(src []*partWrapper, maxFanOut uint64) (dst []*partWrapper) { + sortPartsForOptimalMerge(src) + + maxSrcParts := l.maxParts + if maxSrcParts > len(src) { + maxSrcParts = len(src) + } + minSrcParts := (maxSrcParts + 1) / 2 + if minSrcParts < 2 { + minSrcParts = 2 + } + + // Exhaustive search for parts giving the lowest write amplification when merged. + var pws []*partWrapper + maxM := float64(0) + for i := minSrcParts; i <= maxSrcParts; i++ { + for j := 0; j <= len(src)-i; j++ { + a := src[j : j+i] + if a[0].p.partMetadata.CompressedSizeBytes*uint64(len(a)) < a[len(a)-1].p.partMetadata.CompressedSizeBytes { + // Do not merge parts with too big difference in size, + // since this results in unbalanced merges. + continue + } + outSize := sumCompressedSize(a) + if outSize > maxFanOut { + // There is no need in verifying remaining parts with bigger sizes. + break + } + m := float64(outSize) / float64(a[len(a)-1].p.partMetadata.CompressedSizeBytes) + if m < maxM { + continue + } + maxM = m + pws = a + } + } + + minM := float64(l.maxParts) / 2 + if minM < l.minMergeMultiplier { + minM = l.minMergeMultiplier + } + if maxM < minM { + // There is no sense in merging parts with too small m, + // since this leads to high disk write IO. + return dst + } + return append(dst, pws...) +} + +func sortPartsForOptimalMerge(pws []*partWrapper) { + // Sort src parts by size and backwards timestamp. + // This should improve adjacent points' locality in the merged parts. + sort.Slice(pws, func(i, j int) bool { + a := &pws[i].p.partMetadata + b := &pws[j].p.partMetadata + if a.CompressedSizeBytes == b.CompressedSizeBytes { + return a.MinTimestamp > b.MinTimestamp + } + return a.CompressedSizeBytes < b.CompressedSizeBytes + }) } -func (l *lowestWriteAmplificationMergePolicy) GetPartsToMerge(pws []*partWrapper, maxFanOut uint64) []*partWrapper { - //TODO implement me - panic("implement me") +func sumCompressedSize(pws []*partWrapper) uint64 { + n := uint64(0) + for _, pw := range pws { + n += pw.p.partMetadata.CompressedSizeBytes + } + return n }