This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new d1426ecb Add a sharded buffer to ingest data (#262) d1426ecb is described below commit d1426ecbbf5ccda47e38103666fc2c6ef19fd58b Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Mar 24 08:08:33 2023 +0800 Add a sharded buffer to ingest data (#262) * Add a sharded buffer to ingest data Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + banyand/tsdb/buffer.go | 213 ++++++++++++++++++++++++++++++++++++++++++++ banyand/tsdb/buffer_test.go | 149 +++++++++++++++++++++++++++++++ 3 files changed, 363 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index d4df9af1..17c09ea6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - Add TSDB concept document. - [UI] Add YAML editor for inputting query criteria. - Refactor TopN to support `NULL` group while keeping seriesID from the source measure. +- Add a sharded buffer to TSDB. ### Chores diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go new file mode 100644 index 00000000..8372554a --- /dev/null +++ b/banyand/tsdb/buffer.go @@ -0,0 +1,213 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tsdb + +import ( + "fmt" + "sync" + "time" + + "github.com/dgraph-io/badger/v3/skl" + "github.com/dgraph-io/badger/v3/y" + + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +const defaultSize = 1 << 20 // 1MB + +type operation struct { + key []byte + value []byte + epoch uint64 +} + +type flushEvent struct { + data *skl.Skiplist +} + +type onFlush func(shardIndex int, skl *skl.Skiplist) error + +type bufferShardBucket struct { + mutable *skl.Skiplist + writeCh chan operation + flushCh chan flushEvent + writeWaitGroup *sync.WaitGroup + flushWaitGroup *sync.WaitGroup + log *logger.Logger + immutables []*skl.Skiplist + index int + flushSize int + size int + mutex sync.RWMutex +} + +// Buffer is an exported struct that represents a buffer composed of multiple shard buckets. +type Buffer struct { + onFlushFn onFlush + entryCloser *run.Closer + log *logger.Logger + buckets []bufferShardBucket + writeWaitGroup sync.WaitGroup + flushWaitGroup sync.WaitGroup + numShards int + closerOnce sync.Once +} + +// NewBuffer creates a new Buffer instance with the given parameters. +func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) { + size := flushSize + if size < defaultSize { + size = defaultSize + } + buckets := make([]bufferShardBucket, numShards) + buffer := &Buffer{ + buckets: buckets, + numShards: numShards, + onFlushFn: onFlushFn, + entryCloser: run.NewCloser(1), + log: log.Named("buffer"), + } + buffer.writeWaitGroup.Add(numShards) + buffer.flushWaitGroup.Add(numShards) + for i := 0; i < numShards; i++ { + buckets[i] = bufferShardBucket{ + index: i, + size: size, + mutable: skl.NewSkiplist(int64(size)), + flushSize: flushSize, + writeCh: make(chan operation, writeConcurrency), + flushCh: make(chan flushEvent, 1), + writeWaitGroup: &buffer.writeWaitGroup, + flushWaitGroup: &buffer.flushWaitGroup, + log: buffer.log.Named(fmt.Sprintf("shard-%d", i)), + } + buckets[i].start(onFlushFn) + } + return buffer, nil +} + +// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer. +func (b *Buffer) Write(key, value []byte, timestamp time.Time) { + if !b.entryCloser.AddRunning() { + return + } + defer b.entryCloser.Done() + index := b.getShardIndex(key) + if b.log.Debug().Enabled() { + b.log.Debug().Uint64("shard", index).Bytes("key", key). + Time("ts", timestamp).Msg("route a shard") + } + b.buckets[index].writeCh <- operation{key: key, value: value, epoch: uint64(timestamp.UnixNano())} +} + +// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer. +func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) { + keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano())) + index := b.getShardIndex(key) + epoch := uint64(ts.UnixNano()) + for _, bk := range b.buckets[index].getAll() { + value := bk.Get(keyWithTS) + if value.Meta == 0 && value.Value == nil { + continue + } + if value.Version == epoch { + return value.Value, true + } + } + return nil, false +} + +// Close gracefully closes the Buffer and ensures that all pending operations are completed. +func (b *Buffer) Close() error { + b.closerOnce.Do(func() { + b.entryCloser.Done() + b.entryCloser.CloseThenWait() + for i := 0; i < b.numShards; i++ { + close(b.buckets[i].writeCh) + } + b.writeWaitGroup.Wait() + for i := 0; i < b.numShards; i++ { + if err := b.onFlushFn(i, b.buckets[i].mutable); err != nil { + b.buckets[i].log.Err(err).Msg("flushing mutable buffer failed") + } + b.buckets[i].mutable.DecrRef() + } + for i := 0; i < b.numShards; i++ { + close(b.buckets[i].flushCh) + } + b.flushWaitGroup.Wait() + }) + return nil +} + +func (b *Buffer) getShardIndex(key []byte) uint64 { + return convert.Hash(key) % uint64(b.numShards) +} + +func (bsb *bufferShardBucket) getAll() []*skl.Skiplist { + bsb.mutex.RLock() + defer bsb.mutex.RUnlock() + allList := make([]*skl.Skiplist, len(bsb.immutables)+1) + bsb.mutable.IncrRef() + allList[0] = bsb.mutable + last := len(bsb.immutables) - 1 + for i := range bsb.immutables { + allList[i+1] = bsb.immutables[last-i] + bsb.immutables[last-i].IncrRef() + } + return allList +} + +func (bsb *bufferShardBucket) start(onFlushFn onFlush) { + go func() { + defer bsb.flushWaitGroup.Done() + for event := range bsb.flushCh { + oldSkipList := event.data + if err := onFlushFn(bsb.index, oldSkipList); err != nil { + bsb.log.Err(err).Msg("flushing immutable buffer failed") + continue + } + bsb.mutex.Lock() + bsb.immutables = bsb.immutables[1:] + oldSkipList.DecrRef() + bsb.mutex.Unlock() + } + }() + go func() { + defer bsb.writeWaitGroup.Done() + for op := range bsb.writeCh { + bsb.mutex.Lock() + if bsb.mutable.MemSize() >= int64(bsb.flushSize) { + select { + case bsb.flushCh <- flushEvent{data: bsb.mutable}: + default: + } + bsb.swap() + } + bsb.mutex.Unlock() + bsb.mutable.Put(y.KeyWithTs(op.key, op.epoch), y.ValueStruct{Value: op.value, Version: op.epoch}) + } + }() +} + +func (bsb *bufferShardBucket) swap() { + bsb.immutables = append(bsb.immutables, bsb.mutable) + bsb.mutable = skl.NewSkiplist(int64(bsb.size)) +} diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go new file mode 100644 index 00000000..41e654e5 --- /dev/null +++ b/banyand/tsdb/buffer_test.go @@ -0,0 +1,149 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package tsdb_test + +import ( + "bytes" + "crypto/rand" + "fmt" + "math/big" + "sync" + "time" + + "github.com/dgraph-io/badger/v3/skl" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + + "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = Describe("Buffer", func() { + var ( + buffer *tsdb.Buffer + log = logger.GetLogger("buffer-test") + goods []gleak.Goroutine + ) + + BeforeEach(func() { + goods = gleak.Goroutines() + }) + AfterEach(func() { + Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + Context("Write and Read", func() { + BeforeEach(func() { + var err error + buffer, err = tsdb.NewBuffer(log, 1024*1024, 16, 4, func(shardIndex int, skl *skl.Skiplist) error { + return nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + buffer.Close() + }) + It("should write and read data correctly", func() { + var wg sync.WaitGroup + wg.Add(100) + + for i := 0; i < 100; i++ { + go func(idx int) { + defer GinkgoRecover() + defer wg.Done() + + key := []byte(fmt.Sprintf("key-%d", idx)) + value := []byte(fmt.Sprintf("value-%d", idx)) + ts := time.Now() + + buffer.Write(key, value, ts) + Eventually(func(g Gomega) { + readValue, ok := buffer.Read(key, ts) + g.Expect(ok).To(BeTrue()) + g.Expect(bytes.Equal(value, readValue)).To(BeTrue()) + }, flags.EventuallyTimeout).Should(Succeed()) + }(i) + } + + wg.Wait() + }) + }) + + Context("Flush", func() { + It("should trigger flush when buffer size exceeds the limit", func() { + numShards := 4 + doneChs := make([]chan struct{}, numShards) + for i := 0; i < numShards; i++ { + doneChs[i] = make(chan struct{}) + } + + onFlushFn := func(shardIndex int, skl *skl.Skiplist) error { + if doneChs[shardIndex] == nil { + return nil + } + close(doneChs[shardIndex]) + doneChs[shardIndex] = nil + return nil + } + + var wg sync.WaitGroup + wg.Add(numShards) + + for _, ch := range doneChs { + go func(c <-chan struct{}) { + select { + case res := <-c: + GinkgoWriter.Printf("Received value: %d\n", res) + case <-time.After(10 * time.Second): + GinkgoWriter.Printf("Timeout") + } + wg.Done() + }(ch) + } + + buffer, err := tsdb.NewBuffer(log, 1024, 16, numShards, onFlushFn) + defer func() { + _ = buffer.Close() + }() + Expect(err).ToNot(HaveOccurred()) + + randInt := func() int { + n, err := rand.Int(rand.Reader, big.NewInt(1000)) + if err != nil { + return 0 + } + return int(n.Int64()) + } + for i := 0; i < 1000; i++ { + key := fmt.Sprintf("key-%d", randInt()) + value := fmt.Sprintf("value-%d", randInt()) + ts := time.Now() + + buffer.Write([]byte(key), []byte(value), ts) + } + + wg.Wait() + for i, elem := range doneChs { + if elem != nil { + Fail(fmt.Sprintf("%d in doneChs is not nil", i)) + } + } + }) + }) +})