This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new aa9fb38  Introduce encoding component (#59)
aa9fb38 is described below

commit aa9fb38fb59df1ca4f61c05440089e864bcec27f
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Nov 14 08:16:49 2021 +0800

    Introduce encoding component (#59)
    
    * Encoding component helps tsdb module to customize how to
        encode data in a chunk
      * StreamChunkEncoder/Decoder handles stream module's encoding
      * Update badger to support encoding component
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/kv/badger.go         |  20 ++++
 banyand/kv/kv.go             |  40 ++++---
 banyand/stream/stream.go     |  12 +++
 banyand/tsdb/block.go        |  22 ++--
 banyand/tsdb/tsdb.go         |  27 ++++-
 banyand/tsdb/tsdb_test.go    |   9 ++
 go.mod                       |   6 +-
 go.sum                       |   8 +-
 pkg/encoding/encoding.go     |  66 ++++++++++++
 pkg/encoding/stream_chunk.go | 250 +++++++++++++++++++++++++++++++++++++++++++
 10 files changed, 419 insertions(+), 41 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index a0b0a1b..24820f0 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -24,8 +24,10 @@ import (
        "time"
 
        "github.com/dgraph-io/badger/v3"
+       "github.com/dgraph-io/badger/v3/bydb"
        "github.com/dgraph-io/badger/v3/y"
 
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -256,3 +258,21 @@ func (l *badgerLog) Infof(f string, v ...interface{}) {
 func (l *badgerLog) Debugf(f string, v ...interface{}) {
        l.delegated.Debug().Msgf(f, v...)
 }
+
+var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
+
+type decoderDelegate struct {
+       encoding.SeriesDecoder
+}
+
+func (d *decoderDelegate) Iterator() bydb.TSetIterator {
+       return &iterDelegate{
+               SeriesIterator: d.SeriesDecoder.Iterator(),
+       }
+}
+
+var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
+
+type iterDelegate struct {
+       encoding.SeriesIterator
+}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index fc829d3..ea92d1c 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,8 +23,10 @@ import (
        "math"
 
        "github.com/dgraph-io/badger/v3"
+       "github.com/dgraph-io/badger/v3/bydb"
        "github.com/pkg/errors"
 
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -101,6 +103,20 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
        }
 }
 
+func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, 
decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+       return func(store TimeSeriesStore) {
+               if btss, ok := store.(*badgerTSS); ok {
+                       btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() 
bydb.TSetEncoder {
+                               return encoderFactory()
+                       }, func() bydb.TSetDecoder {
+                               return &decoderDelegate{
+                                       SeriesDecoder: decoderFactory(),
+                               }
+                       })
+               }
+       }
+}
+
 type Iterator interface {
        Next()
        Rewind()
@@ -125,7 +141,7 @@ type IndexStore interface {
 }
 
 // OpenTimeSeriesStore creates a new TimeSeriesStore
-func OpenTimeSeriesStore(shardID int, path string, compressLevel int, 
valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
+func OpenTimeSeriesStore(shardID int, path string, options 
...TimeSeriesOptions) (TimeSeriesStore, error) {
        btss := new(badgerTSS)
        btss.shardID = shardID
        btss.dbOpts = badger.DefaultOptions(path)
@@ -139,7 +155,7 @@ func OpenTimeSeriesStore(shardID int, path string, 
compressLevel int, valueSize
        if err != nil {
                return nil, fmt.Errorf("failed to open time series store: %v", 
err)
        }
-       btss.TSet = *badger.NewTSet(btss.db, compressLevel, valueSize)
+       btss.TSet = *badger.NewTSet(btss.db)
        return btss, nil
 }
 
@@ -161,26 +177,6 @@ func StoreWithNamedLogger(name string, l *logger.Logger) 
StoreOptions {
        }
 }
 
-// StoreWithBufferSize sets a external logger into underlying Store
-func StoreWithBufferSize(size int64) StoreOptions {
-       return func(store Store) {
-               if bdb, ok := store.(*badgerDB); ok {
-                       bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
-               }
-       }
-}
-
-type FlushCallback func()
-
-// StoreWithFlushCallback sets a callback function
-func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
-       return func(store Store) {
-               if bdb, ok := store.(*badgerDB); ok {
-                       bdb.dbOpts.FlushCallBack = callback
-               }
-       }
-}
-
 // OpenStore creates a new Store
 func OpenStore(shardID int, path string, options ...StoreOptions) (Store, 
error) {
        bdb := new(badgerDB)
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 35ae2a5..760687d 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -23,10 +23,14 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
+// a chunk is 1MB
+const chunkSize = 1 << 20
+
 type indexRule struct {
        rule       *databasev1.IndexRule
        tagIndices []partition.TagLocator
@@ -102,6 +106,14 @@ func openStream(root string, spec streamSpec, l 
*logger.Logger) (*stream, error)
                        Location:   root,
                        ShardNum:   sm.schema.GetOpts().GetShardNum(),
                        IndexRules: spec.indexRules,
+                       EncodingMethod: tsdb.EncodingMethod{
+                               EncoderFactory: func() encoding.SeriesEncoder {
+                                       return 
encoding.NewStreamChunkEncoder(chunkSize)
+                               },
+                               DecoderFactory: func() encoding.SeriesDecoder {
+                                       return 
encoding.NewStreamChunkDecoder(chunkSize)
+                               },
+                       },
                })
        if err != nil {
                return nil, err
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 231cef9..2f38308 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -23,6 +23,7 @@ import (
        "time"
 
        "github.com/dgraph-io/ristretto/z"
+       "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -50,11 +51,9 @@ type block struct {
 }
 
 type blockOpts struct {
-       segID         uint16
-       blockID       uint16
-       path          string
-       compressLevel int
-       valueSize     int
+       segID   uint16
+       blockID uint16
+       path    string
 }
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
@@ -71,8 +70,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
                        b.l = pl.Named("block")
                }
        }
-       if b.store, err = kv.OpenTimeSeriesStore(0, b.path+"/store", 
opts.compressLevel, opts.valueSize,
-               kv.TSSWithLogger(b.l)); err != nil {
+       encodingMethodObject := ctx.Value(encodingMethodKey)
+       if encodingMethodObject == nil {
+               return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to 
create a block")
+       }
+       encodingMethod := encodingMethodObject.(EncodingMethod)
+       if b.store, err = kv.OpenTimeSeriesStore(
+               0,
+               b.path+"/store",
+               kv.TSSWithEncoding(encodingMethod.EncoderFactory, 
encodingMethod.DecoderFactory),
+               kv.TSSWithLogger(b.l),
+       ); err != nil {
                return nil, err
        }
        if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 80a84f5..8df233b 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -31,6 +31,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -47,10 +48,16 @@ const (
        dirPerm = 0700
 )
 
-var ErrInvalidShardID = errors.New("invalid shard id")
-var indexRulesKey = contextIndexRulesKey{}
+var (
+       ErrInvalidShardID       = errors.New("invalid shard id")
+       ErrEncodingMethodAbsent = errors.New("encoding method is absent")
+
+       indexRulesKey     = contextIndexRulesKey{}
+       encodingMethodKey = contextEncodingMethodKey{}
+)
 
 type contextIndexRulesKey struct{}
+type contextEncodingMethodKey struct{}
 
 type Database interface {
        io.Closer
@@ -68,9 +75,15 @@ type Shard interface {
 var _ Database = (*database)(nil)
 
 type DatabaseOpts struct {
-       Location   string
-       ShardNum   uint32
-       IndexRules []*databasev1.IndexRule
+       Location       string
+       ShardNum       uint32
+       IndexRules     []*databasev1.IndexRule
+       EncodingMethod EncodingMethod
+}
+
+type EncodingMethod struct {
+       EncoderFactory encoding.SeriesEncoderFactory
+       DecoderFactory encoding.SeriesDecoderFactory
 }
 
 type database struct {
@@ -111,6 +124,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
                        db.logger = pl.Named("tsdb")
                }
        }
+       if opts.EncodingMethod.EncoderFactory == nil || 
opts.EncodingMethod.DecoderFactory == nil {
+               return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to 
open database")
+       }
        if _, err := mkdir(opts.Location); err != nil {
                return nil, err
        }
@@ -122,6 +138,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
        }
        thisContext := context.WithValue(ctx, logger.ContextKey, db.logger)
        thisContext = context.WithValue(thisContext, indexRulesKey, 
opts.IndexRules)
+       thisContext = context.WithValue(thisContext, encodingMethodKey, 
opts.EncodingMethod)
        if len(entries) > 0 {
                return loadDatabase(thisContext, db)
        }
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 3f91e40..89c56d6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
@@ -56,6 +57,14 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc 
func(), db Database
                DatabaseOpts{
                        Location: tempDir,
                        ShardNum: 1,
+                       EncodingMethod: EncodingMethod{
+                               EncoderFactory: func() encoding.SeriesEncoder {
+                                       return nil
+                               },
+                               DecoderFactory: func() encoding.SeriesDecoder {
+                                       return nil
+                               },
+                       },
                })
        t.NoError(err)
        t.NotNil(db)
diff --git a/go.mod b/go.mod
index a1ba284..24e4ce0 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
        github.com/golang/protobuf v1.5.2
        github.com/google/go-cmp v0.5.6
        github.com/google/uuid v1.3.0
-       github.com/klauspost/compress v1.13.1 // indirect
+       github.com/klauspost/compress v1.13.1
        github.com/oklog/run v1.1.0
        github.com/pkg/errors v0.9.1
        github.com/rs/zerolog v1.23.0
@@ -43,7 +43,7 @@ require (
        github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // 
indirect
        github.com/golang/snappy v0.0.3 // indirect
        github.com/google/btree v1.0.1 // indirect
-       github.com/google/flatbuffers v1.12.0 // indirect
+       github.com/google/flatbuffers v1.12.1 // indirect
        github.com/gorilla/websocket v1.4.2 // indirect
        github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -101,4 +101,4 @@ require (
        sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => 
github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => 
github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
diff --git a/go.sum b/go.sum
index 4ed137e..957109b 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod 
h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets 
v0.0.0-20190726190000-eb7c87156f76/go.mod 
h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 
h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod 
h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 
h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod 
h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 
h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
+github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod 
h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -192,8 +192,8 @@ github.com/google/btree 
v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
 github.com/google/btree v1.0.0/go.mod 
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
 github.com/google/btree v1.0.1/go.mod 
h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
-github.com/google/flatbuffers v1.12.0 
h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
-github.com/google/flatbuffers v1.12.0/go.mod 
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/flatbuffers v1.12.1 
h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
+github.com/google/flatbuffers v1.12.1/go.mod 
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
 github.com/google/go-cmp v0.2.0/go.mod 
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
new file mode 100644
index 0000000..5e04e72
--- /dev/null
+++ b/pkg/encoding/encoding.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import "github.com/pkg/errors"
+
+var ErrEncodeEmpty = errors.New("encode an empty value")
+
+type SeriesEncoderFactory func() SeriesEncoder
+
+// SeriesEncoder encodes time series data point
+type SeriesEncoder interface {
+       // Append a data point
+       Append(ts uint64, value []byte)
+       // IsFull returns whether the encoded data reached its capacity
+       IsFull() bool
+       // Reset the underlying buffer
+       Reset()
+       // Encode the time series data point to a binary
+       Encode() ([]byte, error)
+       // StartTime indicates the first entry's time
+       StartTime() uint64
+}
+
+type SeriesDecoderFactory func() SeriesDecoder
+
+// SeriesDecoder decodes encoded time series data
+type SeriesDecoder interface {
+       // Decode the time series data
+       Decode(data []byte) error
+       // Len denotes the size of iterator
+       Len() int
+       // IsFull returns whether the encoded data reached its capacity
+       IsFull() bool
+       // Get the data point by its time
+       Get(ts uint64) ([]byte, error)
+       // Iterator returns a SeriesIterator
+       Iterator() SeriesIterator
+}
+
+// SeriesIterator iterates time series data
+type SeriesIterator interface {
+       // Next scroll the cursor to the next
+       Next() bool
+       // Val returns the value of the current data point
+       Val() []byte
+       // Time returns the time of the current data point
+       Time() uint64
+       // Error might return an error indicates a decode failure
+       Error() error
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/stream_chunk.go
new file mode 100644
index 0000000..7ff5094
--- /dev/null
+++ b/pkg/encoding/stream_chunk.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "sort"
+
+       "github.com/klauspost/compress/zstd"
+       "github.com/pkg/errors"
+)
+
+var (
+       decoder, _               = zstd.NewReader(nil)
+       encoder, _               = zstd.NewWriter(nil, 
zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+       _          SeriesEncoder = (*streamChunkEncoder)(nil)
+       _          SeriesDecoder = (*StreamChunkDecoder)(nil)
+)
+
+//streamChunkEncoder backport to reduced value
+type streamChunkEncoder struct {
+       tsBuff    bytes.Buffer
+       valBuff   bytes.Buffer
+       scratch   [binary.MaxVarintLen64]byte
+       len       uint32
+       num       uint32
+       startTime uint64
+       valueSize int
+}
+
+func NewStreamChunkEncoder(size int) SeriesEncoder {
+       return &streamChunkEncoder{
+               valueSize: size,
+       }
+}
+
+func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+       if t.startTime == 0 {
+               t.startTime = ts
+       } else if t.startTime > ts {
+               t.startTime = ts
+       }
+       vLen := len(value)
+       offset := uint32(len(t.valBuff.Bytes()))
+       t.valBuff.Write(t.putUint32(uint32(vLen)))
+       t.valBuff.Write(value)
+       t.tsBuff.Write(t.putUint64(ts))
+       t.tsBuff.Write(t.putUint32(offset))
+       t.num = t.num + 1
+}
+
+func (t *streamChunkEncoder) IsFull() bool {
+       return t.valBuff.Len() >= t.valueSize
+}
+
+func (t *streamChunkEncoder) Reset() {
+       t.tsBuff.Reset()
+       t.valBuff.Reset()
+       t.num = 0
+       t.startTime = 0
+}
+
+func (t *streamChunkEncoder) Encode() ([]byte, error) {
+       if t.tsBuff.Len() < 1 {
+               return nil, ErrEncodeEmpty
+       }
+       val := t.valBuff.Bytes()
+       t.len = uint32(len(val))
+       _, err := t.tsBuff.WriteTo(&t.valBuff)
+       if err != nil {
+               return nil, err
+       }
+       t.valBuff.Write(t.putUint32(t.num))
+       t.valBuff.Write(t.putUint32(t.len))
+       data := t.valBuff.Bytes()
+       l := len(data)
+       dst := make([]byte, 0, compressBound(l))
+       dst = encoder.EncodeAll(data, dst)
+       result := make([]byte, len(dst)+2)
+       copy(result, dst)
+       copy(result[len(dst):], t.putUint16(uint16(l)))
+       return result, nil
+}
+
+func compressBound(srcSize int) int {
+       return srcSize + (srcSize >> 8)
+}
+
+func (t *streamChunkEncoder) StartTime() uint64 {
+       return t.startTime
+}
+
+func (t *streamChunkEncoder) putUint16(v uint16) []byte {
+       binary.LittleEndian.PutUint16(t.scratch[:], v)
+       return t.scratch[:2]
+}
+
+func (t *streamChunkEncoder) putUint32(v uint32) []byte {
+       binary.LittleEndian.PutUint32(t.scratch[:], v)
+       return t.scratch[:4]
+}
+
+func (t *streamChunkEncoder) putUint64(v uint64) []byte {
+       binary.LittleEndian.PutUint64(t.scratch[:], v)
+       return t.scratch[:8]
+}
+
+const (
+       // TsLen equals ts(uint64) + data_offset(uint32)
+       TsLen = 8 + 4
+)
+
+var ErrInvalidValue = errors.New("invalid encoded value")
+
+//StreamChunkDecoder decodes encoded time index
+type StreamChunkDecoder struct {
+       ts        []byte
+       val       []byte
+       len       uint32
+       num       uint32
+       valueSize int
+}
+
+func NewStreamChunkDecoder(size int) SeriesDecoder {
+       return &StreamChunkDecoder{
+               valueSize: size,
+       }
+}
+
+func (t *StreamChunkDecoder) Len() int {
+       return int(t.num)
+}
+
+func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+       var data []byte
+       size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
+       if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 
0, size)); err != nil {
+               return err
+       }
+       l := uint32(len(data))
+       if l <= 8 {
+               return ErrInvalidValue
+       }
+       lenOffset := len(data) - 4
+       numOffset := lenOffset - 4
+       t.num = binary.LittleEndian.Uint32(data[numOffset:lenOffset])
+       t.len = binary.LittleEndian.Uint32(data[lenOffset:])
+       if l <= t.len+8 {
+               return ErrInvalidValue
+       }
+       t.val = data[:t.len]
+       t.ts = data[t.len:numOffset]
+       return nil
+}
+
+func (t *StreamChunkDecoder) IsFull() bool {
+       return int(t.len) >= t.valueSize
+}
+
+func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+       i := sort.Search(int(t.num), func(i int) bool {
+               slot := getTSSlot(t.ts, i)
+               return parseTS(slot) <= ts
+       })
+       if i >= int(t.num) {
+               return nil, fmt.Errorf("%d doesn't exist", ts)
+       }
+       slot := getTSSlot(t.ts, i)
+       if parseTS(slot) != ts {
+               return nil, fmt.Errorf("%d doesn't exist", ts)
+       }
+       return getVal(t.val, parseOffset(slot))
+}
+
+func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+       return newBlockItemIterator(t)
+}
+
+func getVal(buf []byte, offset uint32) ([]byte, error) {
+       if uint32(len(buf)) <= offset+4 {
+               return nil, ErrInvalidValue
+       }
+       dataLen := binary.LittleEndian.Uint32(buf[offset : offset+4])
+       return buf[offset+4 : offset+4+dataLen], nil
+}
+
+func getTSSlot(data []byte, index int) []byte {
+       return data[index*TsLen : (index+1)*TsLen]
+}
+
+func parseTS(tsSlot []byte) uint64 {
+       return binary.LittleEndian.Uint64(tsSlot[:8])
+}
+
+func parseOffset(tsSlot []byte) uint32 {
+       return binary.LittleEndian.Uint32(tsSlot[8:])
+}
+
+var _ SeriesIterator = (*chunkIterator)(nil)
+
+type chunkIterator struct {
+       index []byte
+       data  []byte
+       idx   int
+       num   int
+}
+
+func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
+       return &chunkIterator{
+               idx:   -1,
+               index: decoder.ts,
+               data:  decoder.val,
+               num:   int(decoder.num),
+       }
+}
+
+func (b *chunkIterator) Next() bool {
+       b.idx++
+       return b.idx >= 0 && b.idx < b.num
+}
+
+func (b *chunkIterator) Val() []byte {
+       v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
+       return v
+}
+
+func (b *chunkIterator) Time() uint64 {
+       return parseTS(getTSSlot(b.index, b.idx))
+}
+
+func (b *chunkIterator) Error() error {
+       return nil
+}

Reply via email to