This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new d1426ecb Add a sharded buffer to ingest data (#262)
d1426ecb is described below

commit d1426ecbbf5ccda47e38103666fc2c6ef19fd58b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Mar 24 08:08:33 2023 +0800

    Add a sharded buffer to ingest data (#262)
    
    * Add a sharded buffer to ingest data
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                  |   1 +
 banyand/tsdb/buffer.go      | 213 ++++++++++++++++++++++++++++++++++++++++++++
 banyand/tsdb/buffer_test.go | 149 +++++++++++++++++++++++++++++++
 3 files changed, 363 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index d4df9af1..17c09ea6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
 - Add TSDB concept document.
 - [UI] Add YAML editor for inputting query criteria.
 - Refactor TopN to support `NULL` group while keeping seriesID from the source 
measure.
+- Add a sharded buffer to TSDB.
 
 ### Chores
 
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
new file mode 100644
index 00000000..8372554a
--- /dev/null
+++ b/banyand/tsdb/buffer.go
@@ -0,0 +1,213 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/dgraph-io/badger/v3/skl"
+       "github.com/dgraph-io/badger/v3/y"
+
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const defaultSize = 1 << 20 // 1MB
+
+type operation struct {
+       key   []byte
+       value []byte
+       epoch uint64
+}
+
+type flushEvent struct {
+       data *skl.Skiplist
+}
+
+type onFlush func(shardIndex int, skl *skl.Skiplist) error
+
+type bufferShardBucket struct {
+       mutable        *skl.Skiplist
+       writeCh        chan operation
+       flushCh        chan flushEvent
+       writeWaitGroup *sync.WaitGroup
+       flushWaitGroup *sync.WaitGroup
+       log            *logger.Logger
+       immutables     []*skl.Skiplist
+       index          int
+       flushSize      int
+       size           int
+       mutex          sync.RWMutex
+}
+
+// Buffer is an exported struct that represents a buffer composed of multiple 
shard buckets.
+type Buffer struct {
+       onFlushFn      onFlush
+       entryCloser    *run.Closer
+       log            *logger.Logger
+       buckets        []bufferShardBucket
+       writeWaitGroup sync.WaitGroup
+       flushWaitGroup sync.WaitGroup
+       numShards      int
+       closerOnce     sync.Once
+}
+
+// NewBuffer creates a new Buffer instance with the given parameters.
+func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, 
onFlushFn onFlush) (*Buffer, error) {
+       size := flushSize
+       if size < defaultSize {
+               size = defaultSize
+       }
+       buckets := make([]bufferShardBucket, numShards)
+       buffer := &Buffer{
+               buckets:     buckets,
+               numShards:   numShards,
+               onFlushFn:   onFlushFn,
+               entryCloser: run.NewCloser(1),
+               log:         log.Named("buffer"),
+       }
+       buffer.writeWaitGroup.Add(numShards)
+       buffer.flushWaitGroup.Add(numShards)
+       for i := 0; i < numShards; i++ {
+               buckets[i] = bufferShardBucket{
+                       index:          i,
+                       size:           size,
+                       mutable:        skl.NewSkiplist(int64(size)),
+                       flushSize:      flushSize,
+                       writeCh:        make(chan operation, writeConcurrency),
+                       flushCh:        make(chan flushEvent, 1),
+                       writeWaitGroup: &buffer.writeWaitGroup,
+                       flushWaitGroup: &buffer.flushWaitGroup,
+                       log:            
buffer.log.Named(fmt.Sprintf("shard-%d", i)),
+               }
+               buckets[i].start(onFlushFn)
+       }
+       return buffer, nil
+}
+
+// Write adds a key-value pair with a timestamp to the appropriate shard 
bucket in the buffer.
+func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
+       if !b.entryCloser.AddRunning() {
+               return
+       }
+       defer b.entryCloser.Done()
+       index := b.getShardIndex(key)
+       if b.log.Debug().Enabled() {
+               b.log.Debug().Uint64("shard", index).Bytes("key", key).
+                       Time("ts", timestamp).Msg("route a shard")
+       }
+       b.buckets[index].writeCh <- operation{key: key, value: value, epoch: 
uint64(timestamp.UnixNano())}
+}
+
+// Read retrieves the value associated with the given key and timestamp from 
the appropriate shard bucket in the buffer.
+func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+       keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
+       index := b.getShardIndex(key)
+       epoch := uint64(ts.UnixNano())
+       for _, bk := range b.buckets[index].getAll() {
+               value := bk.Get(keyWithTS)
+               if value.Meta == 0 && value.Value == nil {
+                       continue
+               }
+               if value.Version == epoch {
+                       return value.Value, true
+               }
+       }
+       return nil, false
+}
+
+// Close gracefully closes the Buffer and ensures that all pending operations 
are completed.
+func (b *Buffer) Close() error {
+       b.closerOnce.Do(func() {
+               b.entryCloser.Done()
+               b.entryCloser.CloseThenWait()
+               for i := 0; i < b.numShards; i++ {
+                       close(b.buckets[i].writeCh)
+               }
+               b.writeWaitGroup.Wait()
+               for i := 0; i < b.numShards; i++ {
+                       if err := b.onFlushFn(i, b.buckets[i].mutable); err != 
nil {
+                               b.buckets[i].log.Err(err).Msg("flushing mutable 
buffer failed")
+                       }
+                       b.buckets[i].mutable.DecrRef()
+               }
+               for i := 0; i < b.numShards; i++ {
+                       close(b.buckets[i].flushCh)
+               }
+               b.flushWaitGroup.Wait()
+       })
+       return nil
+}
+
+func (b *Buffer) getShardIndex(key []byte) uint64 {
+       return convert.Hash(key) % uint64(b.numShards)
+}
+
+func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
+       bsb.mutex.RLock()
+       defer bsb.mutex.RUnlock()
+       allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
+       bsb.mutable.IncrRef()
+       allList[0] = bsb.mutable
+       last := len(bsb.immutables) - 1
+       for i := range bsb.immutables {
+               allList[i+1] = bsb.immutables[last-i]
+               bsb.immutables[last-i].IncrRef()
+       }
+       return allList
+}
+
+func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
+       go func() {
+               defer bsb.flushWaitGroup.Done()
+               for event := range bsb.flushCh {
+                       oldSkipList := event.data
+                       if err := onFlushFn(bsb.index, oldSkipList); err != nil 
{
+                               bsb.log.Err(err).Msg("flushing immutable buffer 
failed")
+                               continue
+                       }
+                       bsb.mutex.Lock()
+                       bsb.immutables = bsb.immutables[1:]
+                       oldSkipList.DecrRef()
+                       bsb.mutex.Unlock()
+               }
+       }()
+       go func() {
+               defer bsb.writeWaitGroup.Done()
+               for op := range bsb.writeCh {
+                       bsb.mutex.Lock()
+                       if bsb.mutable.MemSize() >= int64(bsb.flushSize) {
+                               select {
+                               case bsb.flushCh <- flushEvent{data: 
bsb.mutable}:
+                               default:
+                               }
+                               bsb.swap()
+                       }
+                       bsb.mutex.Unlock()
+                       bsb.mutable.Put(y.KeyWithTs(op.key, op.epoch), 
y.ValueStruct{Value: op.value, Version: op.epoch})
+               }
+       }()
+}
+
+func (bsb *bufferShardBucket) swap() {
+       bsb.immutables = append(bsb.immutables, bsb.mutable)
+       bsb.mutable = skl.NewSkiplist(int64(bsb.size))
+}
diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go
new file mode 100644
index 00000000..41e654e5
--- /dev/null
+++ b/banyand/tsdb/buffer_test.go
@@ -0,0 +1,149 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package tsdb_test
+
+import (
+       "bytes"
+       "crypto/rand"
+       "fmt"
+       "math/big"
+       "sync"
+       "time"
+
+       "github.com/dgraph-io/badger/v3/skl"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = Describe("Buffer", func() {
+       var (
+               buffer *tsdb.Buffer
+               log    = logger.GetLogger("buffer-test")
+               goods  []gleak.Goroutine
+       )
+
+       BeforeEach(func() {
+               goods = gleak.Goroutines()
+       })
+       AfterEach(func() {
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+       Context("Write and Read", func() {
+               BeforeEach(func() {
+                       var err error
+                       buffer, err = tsdb.NewBuffer(log, 1024*1024, 16, 4, 
func(shardIndex int, skl *skl.Skiplist) error {
+                               return nil
+                       })
+                       Expect(err).ToNot(HaveOccurred())
+               })
+
+               AfterEach(func() {
+                       buffer.Close()
+               })
+               It("should write and read data correctly", func() {
+                       var wg sync.WaitGroup
+                       wg.Add(100)
+
+                       for i := 0; i < 100; i++ {
+                               go func(idx int) {
+                                       defer GinkgoRecover()
+                                       defer wg.Done()
+
+                                       key := []byte(fmt.Sprintf("key-%d", 
idx))
+                                       value := []byte(fmt.Sprintf("value-%d", 
idx))
+                                       ts := time.Now()
+
+                                       buffer.Write(key, value, ts)
+                                       Eventually(func(g Gomega) {
+                                               readValue, ok := 
buffer.Read(key, ts)
+                                               g.Expect(ok).To(BeTrue())
+                                               g.Expect(bytes.Equal(value, 
readValue)).To(BeTrue())
+                                       }, 
flags.EventuallyTimeout).Should(Succeed())
+                               }(i)
+                       }
+
+                       wg.Wait()
+               })
+       })
+
+       Context("Flush", func() {
+               It("should trigger flush when buffer size exceeds the limit", 
func() {
+                       numShards := 4
+                       doneChs := make([]chan struct{}, numShards)
+                       for i := 0; i < numShards; i++ {
+                               doneChs[i] = make(chan struct{})
+                       }
+
+                       onFlushFn := func(shardIndex int, skl *skl.Skiplist) 
error {
+                               if doneChs[shardIndex] == nil {
+                                       return nil
+                               }
+                               close(doneChs[shardIndex])
+                               doneChs[shardIndex] = nil
+                               return nil
+                       }
+
+                       var wg sync.WaitGroup
+                       wg.Add(numShards)
+
+                       for _, ch := range doneChs {
+                               go func(c <-chan struct{}) {
+                                       select {
+                                       case res := <-c:
+                                               GinkgoWriter.Printf("Received 
value: %d\n", res)
+                                       case <-time.After(10 * time.Second):
+                                               GinkgoWriter.Printf("Timeout")
+                                       }
+                                       wg.Done()
+                               }(ch)
+                       }
+
+                       buffer, err := tsdb.NewBuffer(log, 1024, 16, numShards, 
onFlushFn)
+                       defer func() {
+                               _ = buffer.Close()
+                       }()
+                       Expect(err).ToNot(HaveOccurred())
+
+                       randInt := func() int {
+                               n, err := rand.Int(rand.Reader, 
big.NewInt(1000))
+                               if err != nil {
+                                       return 0
+                               }
+                               return int(n.Int64())
+                       }
+                       for i := 0; i < 1000; i++ {
+                               key := fmt.Sprintf("key-%d", randInt())
+                               value := fmt.Sprintf("value-%d", randInt())
+                               ts := time.Now()
+
+                               buffer.Write([]byte(key), []byte(value), ts)
+                       }
+
+                       wg.Wait()
+                       for i, elem := range doneChs {
+                               if elem != nil {
+                                       Fail(fmt.Sprintf("%d in doneChs is not 
nil", i))
+                               }
+                       }
+               })
+       })
+})

Reply via email to