This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch stream-ele
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 672dd60279e14fc45cf540fce99c31612d937de0
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Aug 1 12:57:20 2024 +0800

    Merge elements.bin and timestamp.bin
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                            |   1 +
 banyand/measure/block.go              |  28 ++++------
 banyand/stream/block.go               | 100 ++++++++--------------------------
 banyand/stream/block_metadata.go      |  20 +++++--
 banyand/stream/block_metadata_test.go |  31 +++++++----
 banyand/stream/block_reader.go        |   3 -
 banyand/stream/block_test.go          |  83 ++++++----------------------
 banyand/stream/block_writer.go        |   7 +--
 banyand/stream/part.go                |   8 ---
 banyand/stream/snapshot_test.go       |   1 -
 banyand/stream/stream.go              |   1 -
 pkg/encoding/int_list.go              |   8 +--
 12 files changed, 92 insertions(+), 199 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 62f8929d..50884405 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,7 @@ Release Notes.
 - Move the series index into segment.
 - Swap the segment and the shard.
 - Move indexed values in a measure from data files to index files.
+- Merge elementIDs.bin and timestamps.bin into a single file.
 
 ### Features
 
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 93376790..001940cc 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -380,25 +380,21 @@ func mustSeqReadTimestampsFrom(timestamps, versions 
[]int64, tm *timestampsMetad
 
 func mustDecodeTimestampsWithVersions(timestamps, versions []int64, tm 
*timestampsMetadata, count int, path string, src []byte) ([]int64, []int64) {
        var err error
-       if t := encoding.GetCommonType(tm.encodeType); t != 
encoding.EncodeTypeUnknown {
-               if tm.size < tm.versionOffset {
-                       logger.Panicf("size %d must be greater than 
versionOffset %d", tm.size, tm.versionOffset)
-               }
-               timestamps, err = encoding.BytesToInt64List(timestamps, 
src[:tm.versionOffset], t, tm.min, count)
-               if err != nil {
-                       logger.Panicf("%s: cannot unmarshal timestamps with 
versions: %v", path, err)
-               }
-               versions, err = encoding.BytesToInt64List(versions, 
src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count)
-               if err != nil {
-                       logger.Panicf("%s: cannot unmarshal versions: %v", 
path, err)
-               }
-               return timestamps, versions
+       t := encoding.GetCommonType(tm.encodeType)
+       if t == encoding.EncodeTypeUnknown {
+               logger.Panicf("unexpected encodeType %d", tm.encodeType)
+       }
+       if tm.size < tm.versionOffset {
+               logger.Panicf("size %d must be greater than versionOffset %d", 
tm.size, tm.versionOffset)
+       }
+       timestamps, err = encoding.BytesToInt64List(timestamps, 
src[:tm.versionOffset], t, tm.min, count)
+       if err != nil {
+               logger.Panicf("%s: cannot unmarshal timestamps with versions: 
%v", path, err)
        }
-       timestamps, err = encoding.BytesToInt64List(timestamps, src, 
tm.encodeType, tm.min, count)
+       versions, err = encoding.BytesToInt64List(versions, 
src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count)
        if err != nil {
-               logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err)
+               logger.Panicf("%s: cannot unmarshal versions: %v", path, err)
        }
-       versions = encoding.ExtendInt64ListCapacity(versions, count)
        return timestamps, versions
 }
 
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 2f2fc3b5..bd0db065 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -26,7 +26,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -128,8 +127,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm 
*blockMetadata, ww *writers)
        bm.uncompressedSizeBytes = b.uncompressedSizeBytes()
        bm.count = uint64(b.Len())
 
-       mustWriteTimestampsTo(&bm.timestamps, b.timestamps, 
&ww.timestampsWriter)
-       mustWriteElementIDsTo(&bm.elementIDs, b.elementIDs, 
&ww.elementIDsWriter)
+       mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.elementIDs, 
&ww.timestampsWriter)
 
        for ti := range b.tagFamilies {
                b.marshalTagFamily(b.tagFamilies[ti], bm, ww)
@@ -256,8 +254,7 @@ func (b *block) uncompressedSizeBytes() uint64 {
 func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm 
blockMetadata) {
        b.reset()
 
-       b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps, 
int(bm.count), p.timestamps)
-       b.elementIDs = mustReadElementIDsFrom(b.elementIDs, &bm.elementIDs, 
int(bm.count), p.elementIDs)
+       b.timestamps, b.elementIDs = mustReadTimestampsFrom(b.timestamps, 
b.elementIDs, &bm.timestamps, int(bm.count), p.timestamps)
 
        _ = b.resizeTagFamilies(len(bm.tagProjection))
        for i := range bm.tagProjection {
@@ -275,8 +272,7 @@ func (b *block) mustReadFrom(decoder 
*encoding.BytesBlockDecoder, p *part, bm bl
 func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, 
seqReaders *seqReaders, bm blockMetadata) {
        b.reset()
 
-       b.timestamps = mustSeqReadTimestampsFrom(b.timestamps, &bm.timestamps, 
int(bm.count), &seqReaders.timestamps)
-       b.elementIDs = mustSeqReadElementIDsFrom(b.elementIDs, &bm.elementIDs, 
int(bm.count), &seqReaders.elementIDs)
+       b.timestamps, b.elementIDs = mustSeqReadTimestampsFrom(b.timestamps, 
b.elementIDs, &bm.timestamps, int(bm.count), &seqReaders.timestamps)
 
        _ = b.resizeTagFamilies(len(bm.tagFamilies))
        keys := make([]string, 0, len(bm.tagFamilies))
@@ -298,71 +294,48 @@ func (b *block) sortTagFamilies() {
        })
 }
 
-func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, 
timestampsWriter *writer) {
+func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, 
elementIDs []uint64, timestampsWriter *writer) {
        tm.reset()
 
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
        bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0], 
timestamps)
-       if len(bb.Buf) > maxTimestampsBlockSize {
-               logger.Panicf("too big block with timestamps: %d bytes; the 
maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize)
-       }
        tm.max = timestamps[len(timestamps)-1]
        tm.offset = timestampsWriter.bytesWritten
-       tm.size = uint64(len(bb.Buf))
+       tm.elementIDsOffset = uint64(len(bb.Buf))
+       timestampsWriter.MustWrite(bb.Buf)
+       bb.Buf = encoding.VarUint64sToBytes(bb.Buf[:0], elementIDs)
+       tm.size = tm.elementIDsOffset + uint64(len(bb.Buf))
        timestampsWriter.MustWrite(bb.Buf)
 }
 
-func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, 
reader fs.Reader) []int64 {
+func mustReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm 
*timestampsMetadata, count int, reader fs.Reader) ([]int64, []uint64) {
        bb := bigValuePool.Generate()
        defer bigValuePool.Release(bb)
        bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
        fs.MustReadData(reader, int64(tm.offset), bb.Buf)
-       var err error
-       dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, 
tm.min, count)
-       if err != nil {
-               logger.Panicf("%s: cannot unmarshal timestamps: %v", 
reader.Path(), err)
-       }
-       return dst
+       return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, 
count, reader.Path(), bb.Buf)
 }
 
-func mustWriteElementIDsTo(em *elementIDsMetadata, elementIDs []uint64, 
elementIDsWriter *writer) {
-       em.reset()
-
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-       elementIDsByteSlice := make([][]byte, len(elementIDs))
-       for i, elementID := range elementIDs {
-               elementIDsByteSlice[i] = convert.Uint64ToBytes(elementID)
-       }
-       bb.Buf = encoding.EncodeBytesBlock(bb.Buf, elementIDsByteSlice)
-       if len(bb.Buf) > maxElementIDsBlockSize {
-               logger.Panicf("too big block with elementIDs: %d bytes; the 
maximum supported size is %d bytes", len(bb.Buf), maxElementIDsBlockSize)
+func mustDecodeTimestampsWithVersions(timestamps []int64, elementIDs []uint64, 
tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []uint64) 
{
+       if tm.size < tm.elementIDsOffset {
+               logger.Panicf("size %d must be greater than elementIDsOffset 
%d", tm.size, tm.elementIDsOffset)
        }
-       em.encodeType = encoding.EncodeTypeUnknown
-       em.offset = elementIDsWriter.bytesWritten
-       em.size = uint64(len(bb.Buf))
-       elementIDsWriter.MustWrite(bb.Buf)
-}
-
-func mustReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, 
reader fs.Reader) []uint64 {
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size))
-       fs.MustReadData(reader, int64(em.offset), bb.Buf)
-       decoder := encoding.BytesBlockDecoder{}
-       var elementIDsByteSlice [][]byte
-       elementIDsByteSlice, err := decoder.Decode(elementIDsByteSlice, bb.Buf, 
uint64(count))
+       var err error
+       timestamps, err = encoding.BytesToInt64List(timestamps, 
src[:tm.elementIDsOffset], tm.encodeType, tm.min, count)
        if err != nil {
-               logger.Panicf("%s: cannot unmarshal elementIDs: %v", 
reader.Path(), err)
+               logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err)
        }
-       for _, elementID := range elementIDsByteSlice {
-               dst = append(dst, convert.BytesToUint64(elementID))
+       elementIDs = encoding.ExtendListCapacity(elementIDs, count)
+       elementIDs = elementIDs[:count]
+       _, err = encoding.BytesToVarUint64s(elementIDs, 
src[tm.elementIDsOffset:])
+       if err != nil {
+               logger.Panicf("%s: cannot unmarshal element ids: %v", path, err)
        }
-       return dst
+       return timestamps, elementIDs
 }
 
-func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, 
reader *seqReader) []int64 {
+func mustSeqReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm 
*timestampsMetadata, count int, reader *seqReader) ([]int64, []uint64) {
        if tm.offset != reader.bytesRead {
                logger.Panicf("offset %d must be equal to bytesRead %d", 
tm.offset, reader.bytesRead)
        }
@@ -370,32 +343,7 @@ func mustSeqReadTimestampsFrom(dst []int64, tm 
*timestampsMetadata, count int, r
        defer bigValuePool.Release(bb)
        bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
        reader.mustReadFull(bb.Buf)
-       var err error
-       dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, 
tm.min, count)
-       if err != nil {
-               logger.Panicf("%s: cannot unmarshal timestamps: %v", 
reader.Path(), err)
-       }
-       return dst
-}
-
-func mustSeqReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count 
int, reader *seqReader) []uint64 {
-       if em.offset != reader.bytesRead {
-               logger.Panicf("offset %d must be equal to bytesRead %d", 
em.offset, reader.bytesRead)
-       }
-       bb := bigValuePool.Generate()
-       defer bigValuePool.Release(bb)
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size))
-       reader.mustReadFull(bb.Buf)
-       decoder := encoding.BytesBlockDecoder{}
-       var elementIDsByteSlice [][]byte
-       elementIDsByteSlice, err := decoder.Decode(elementIDsByteSlice, bb.Buf, 
uint64(count))
-       if err != nil {
-               logger.Panicf("%s: cannot unmarshal elementIDs: %v", 
reader.Path(), err)
-       }
-       for _, elementID := range elementIDsByteSlice {
-               dst = append(dst, convert.BytesToUint64(elementID))
-       }
-       return dst
+       return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, 
count, reader.Path(), bb.Buf)
 }
 
 func generateBlock() *block {
diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go
index 92aadda2..9f0091ee 100644
--- a/banyand/stream/block_metadata.go
+++ b/banyand/stream/block_metadata.go
@@ -240,9 +240,10 @@ func releaseBlockMetadataArray(bma *blockMetadataArray) {
 
 type timestampsMetadata struct {
        dataBlock
-       min        int64
-       max        int64
-       encodeType encoding.EncodeType
+       min              int64
+       max              int64
+       elementIDsOffset uint64
+       encodeType       encoding.EncodeType
 }
 
 func (tm *timestampsMetadata) reset() {
@@ -250,6 +251,7 @@ func (tm *timestampsMetadata) reset() {
        tm.min = 0
        tm.max = 0
        tm.encodeType = 0
+       tm.elementIDsOffset = 0
 }
 
 func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) {
@@ -257,6 +259,7 @@ func (tm *timestampsMetadata) copyFrom(src 
*timestampsMetadata) {
        tm.min = src.min
        tm.max = src.max
        tm.encodeType = src.encodeType
+       tm.elementIDsOffset = src.elementIDsOffset
 }
 
 func (tm *timestampsMetadata) marshal(dst []byte) []byte {
@@ -264,20 +267,27 @@ func (tm *timestampsMetadata) marshal(dst []byte) []byte {
        dst = encoding.Uint64ToBytes(dst, uint64(tm.min))
        dst = encoding.Uint64ToBytes(dst, uint64(tm.max))
        dst = append(dst, byte(tm.encodeType))
+       dst = encoding.VarUint64ToBytes(dst, tm.elementIDsOffset)
        return dst
 }
 
 func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
        src, err := tm.dataBlock.unmarshal(src)
        if err != nil {
-               return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err)
+               return nil, fmt.Errorf("cannot unmarshal ts blockData: %w", err)
        }
        tm.min = int64(encoding.BytesToUint64(src))
        src = src[8:]
        tm.max = int64(encoding.BytesToUint64(src))
        src = src[8:]
        tm.encodeType = encoding.EncodeType(src[0])
-       return src[1:], nil
+       src = src[1:]
+       src, n, err := encoding.BytesToVarUint64(src)
+       if err != nil {
+               return nil, fmt.Errorf("cannot unmarshal ts offset: %w", err)
+       }
+       tm.elementIDsOffset = n
+       return src, nil
 }
 
 type elementIDsMetadata struct {
diff --git a/banyand/stream/block_metadata_test.go 
b/banyand/stream/block_metadata_test.go
index 5c721b7d..00751859 100644
--- a/banyand/stream/block_metadata_test.go
+++ b/banyand/stream/block_metadata_test.go
@@ -79,9 +79,10 @@ func Test_timestampsMetadata_reset(t *testing.T) {
                        offset: 1,
                        size:   1,
                },
-               min:        1,
-               max:        1,
-               encodeType: encoding.EncodeTypeConst,
+               min:              1,
+               max:              1,
+               encodeType:       encoding.EncodeTypeConst,
+               elementIDsOffset: 1,
        }
 
        tm.reset()
@@ -90,6 +91,7 @@ func Test_timestampsMetadata_reset(t *testing.T) {
        assert.Equal(t, uint64(0), tm.dataBlock.size)
        assert.Equal(t, int64(0), tm.min)
        assert.Equal(t, int64(0), tm.max)
+       assert.Equal(t, uint64(0), tm.elementIDsOffset)
        assert.Equal(t, encoding.EncodeTypeUnknown, tm.encodeType)
 }
 
@@ -99,9 +101,10 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
                        offset: 1,
                        size:   1,
                },
-               min:        1,
-               max:        1,
-               encodeType: encoding.EncodeTypeConst,
+               min:              1,
+               max:              1,
+               encodeType:       encoding.EncodeTypeConst,
+               elementIDsOffset: 1,
        }
 
        dest := &timestampsMetadata{
@@ -109,9 +112,10 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
                        offset: 2,
                        size:   2,
                },
-               min:        2,
-               max:        2,
-               encodeType: encoding.EncodeTypeDelta,
+               min:              2,
+               max:              2,
+               encodeType:       encoding.EncodeTypeDelta,
+               elementIDsOffset: 2,
        }
 
        dest.copyFrom(src)
@@ -121,6 +125,7 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
        assert.Equal(t, src.min, dest.min)
        assert.Equal(t, src.max, dest.max)
        assert.Equal(t, src.encodeType, dest.encodeType)
+       assert.Equal(t, src.elementIDsOffset, dest.elementIDsOffset)
 }
 
 func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) {
@@ -129,9 +134,10 @@ func Test_timestampsMetadata_marshal_unmarshal(t 
*testing.T) {
                        offset: 1,
                        size:   1,
                },
-               min:        1,
-               max:        1,
-               encodeType: encoding.EncodeTypeConst,
+               min:              1,
+               max:              1,
+               encodeType:       encoding.EncodeTypeConst,
+               elementIDsOffset: 1,
        }
 
        marshaled := original.marshal(nil)
@@ -146,6 +152,7 @@ func Test_timestampsMetadata_marshal_unmarshal(t 
*testing.T) {
        assert.Equal(t, original.min, unmarshaled.min)
        assert.Equal(t, original.max, unmarshaled.max)
        assert.Equal(t, original.encodeType, unmarshaled.encodeType)
+       assert.Equal(t, original.elementIDsOffset, unmarshaled.elementIDsOffset)
 }
 
 func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go
index c952aa77..60701515 100644
--- a/banyand/stream/block_reader.go
+++ b/banyand/stream/block_reader.go
@@ -87,13 +87,11 @@ type seqReaders struct {
        tagFamilies       map[string]*seqReader
        primary           seqReader
        timestamps        seqReader
-       elementIDs        seqReader
 }
 
 func (sr *seqReaders) reset() {
        sr.primary.reset()
        sr.timestamps.reset()
-       sr.elementIDs.reset()
        if sr.tagFamilyMetadata != nil {
                for k, r := range sr.tagFamilyMetadata {
                        releaseSeqReader(r)
@@ -112,7 +110,6 @@ func (sr *seqReaders) init(p *part) {
        sr.reset()
        sr.primary.init(p.primary)
        sr.timestamps.init(p.timestamps)
-       sr.elementIDs.init(p.elementIDs)
        if sr.tagFamilies == nil {
                sr.tagFamilies = make(map[string]*seqReader)
                sr.tagFamilyMetadata = make(map[string]*seqReader)
diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go
index 9c3bf7ed..ed57f48d 100644
--- a/banyand/stream/block_test.go
+++ b/banyand/stream/block_test.go
@@ -18,8 +18,6 @@
 package stream
 
 import (
-       "crypto/rand"
-       "encoding/binary"
        "reflect"
        "testing"
 
@@ -215,20 +213,16 @@ func marshalIntArr(arr [][]byte) []byte {
 
 func Test_mustWriteAndReadTimestamps(t *testing.T) {
        tests := []struct {
-               name      string
-               args      []int64
-               wantPanic bool
-               wantTM    timestampsMetadata
+               name       string
+               timestamps []int64
+               elementIDs []uint64
+               wantPanic  bool
+               wantTM     timestampsMetadata
        }{
                {
-                       name:      "Test mustWriteAndReadTimestamps",
-                       args:      []int64{1, 2, 3, 4, 5},
-                       wantPanic: false,
-               },
-               {
-                       name:      "Test mustWriteAndReadTimestamps with panic",
-                       args:      getBitInt64Arr(),
-                       wantPanic: true,
+                       name:       "Test mustWriteAndReadTimestamps",
+                       timestamps: []int64{1, 2, 3, 4, 5},
+                       elementIDs: []uint64{0, 1, 2, 3, 4},
                },
        }
        for _, tt := range tests {
@@ -243,60 +237,18 @@ func Test_mustWriteAndReadTimestamps(t *testing.T) {
                        b := &bytes.Buffer{}
                        w := new(writer)
                        w.init(b)
-                       mustWriteTimestampsTo(tm, tt.args, w)
-                       timestamps := mustReadTimestampsFrom(nil, tm, 
len(tt.args), b)
-                       if !reflect.DeepEqual(timestamps, tt.args) {
-                               t.Errorf("mustReadTimestampsFrom() = %v, want 
%v", timestamps, tt.args)
+                       mustWriteTimestampsTo(tm, tt.timestamps, tt.elementIDs, 
w)
+                       timestamps, elementIDs := mustReadTimestampsFrom(nil, 
nil, tm, len(tt.timestamps), b)
+                       if !reflect.DeepEqual(timestamps, tt.timestamps) {
+                               t.Errorf("mustReadTimestampsFrom() timestamps = 
%v, want %v", timestamps, tt.timestamps)
                        }
-               })
-       }
-}
-
-func Test_mustWriteAndReadElementIDs(t *testing.T) {
-       tests := []struct {
-               name      string
-               args      []uint64
-               wantPanic bool
-               wantTM    elementIDsMetadata
-       }{
-               {
-                       name:      "Test mustWriteAndReadElementIDs",
-                       args:      []uint64{0, 1, 2, 3, 4},
-                       wantPanic: false,
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       defer func() {
-                               r := recover()
-                               if (r != nil) != tt.wantPanic {
-                                       t.Errorf("mustWriteElementIDs() recover 
= %v, wantPanic = %v", r, tt.wantPanic)
-                               }
-                       }()
-                       em := &elementIDsMetadata{}
-                       b := &bytes.Buffer{}
-                       w := new(writer)
-                       w.init(b)
-                       mustWriteElementIDsTo(em, tt.args, w)
-                       elementIDs := mustReadElementIDsFrom(nil, em, 
len(tt.args), b)
-                       if !reflect.DeepEqual(elementIDs, tt.args) {
-                               t.Errorf("mustReadElementIDsFrom() = %v, want 
%v", elementIDs, tt.args)
+                       if !reflect.DeepEqual(elementIDs, tt.elementIDs) {
+                               t.Errorf("mustReadTimestampsFrom() elementIDs = 
%v, want %v", elementIDs, tt.elementIDs)
                        }
                })
        }
 }
 
-func getBitInt64Arr() []int64 {
-       size := maxTimestampsBlockSize + 1
-       randSlice := make([]int64, size)
-       for i := range randSlice {
-               b := make([]byte, 8)
-               _, _ = rand.Read(b)
-               randSlice[i] = int64(binary.BigEndian.Uint64(b))
-       }
-       return randSlice
-}
-
 func Test_marshalAndUnmarshalTagFamily(t *testing.T) {
        metaBuffer, dataBuffer := &bytes.Buffer{}, &bytes.Buffer{}
        ww := &writers{
@@ -361,10 +313,9 @@ func Test_marshalAndUnmarshalTagFamily(t *testing.T) {
 }
 
 func Test_marshalAndUnmarshalBlock(t *testing.T) {
-       timestampBuffer, elementIDsBuffer := &bytes.Buffer{}, &bytes.Buffer{}
-       timestampWriter, elementIDsWriter := &writer{}, &writer{}
+       timestampBuffer := &bytes.Buffer{}
+       timestampWriter := &writer{}
        timestampWriter.init(timestampBuffer)
-       elementIDsWriter.init(elementIDsBuffer)
        ww := &writers{
                mustCreateTagFamilyWriters: func(_ string) (fs.Writer, 
fs.Writer) {
                        return &bytes.Buffer{}, &bytes.Buffer{}
@@ -372,12 +323,10 @@ func Test_marshalAndUnmarshalBlock(t *testing.T) {
                tagFamilyMetadataWriters: make(map[string]*writer),
                tagFamilyWriters:         make(map[string]*writer),
                timestampsWriter:         *timestampWriter,
-               elementIDsWriter:         *elementIDsWriter,
        }
        p := &part{
                primary:    &bytes.Buffer{},
                timestamps: timestampBuffer,
-               elementIDs: elementIDsBuffer,
        }
        b := &conventionalBlock
        tagProjection := toTagProjection(*b)
diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go
index 8f51d743..5125bd85 100644
--- a/banyand/stream/block_writer.go
+++ b/banyand/stream/block_writer.go
@@ -65,7 +65,6 @@ type writers struct {
        tagFamilyMetadataWriters   map[string]*writer
        tagFamilyWriters           map[string]*writer
        timestampsWriter           writer
-       elementIDsWriter           writer
 }
 
 func (sw *writers) reset() {
@@ -73,7 +72,6 @@ func (sw *writers) reset() {
        sw.metaWriter.reset()
        sw.primaryWriter.reset()
        sw.timestampsWriter.reset()
-       sw.elementIDsWriter.reset()
 
        for i, w := range sw.tagFamilyMetadataWriters {
                w.reset()
@@ -87,7 +85,7 @@ func (sw *writers) reset() {
 
 func (sw *writers) totalBytesWritten() uint64 {
        n := sw.metaWriter.bytesWritten + sw.primaryWriter.bytesWritten +
-               sw.timestampsWriter.bytesWritten + 
sw.elementIDsWriter.bytesWritten
+               sw.timestampsWriter.bytesWritten
        for _, w := range sw.tagFamilyMetadataWriters {
                n += w.bytesWritten
        }
@@ -101,7 +99,6 @@ func (sw *writers) MustClose() {
        sw.metaWriter.MustClose()
        sw.primaryWriter.MustClose()
        sw.timestampsWriter.MustClose()
-       sw.elementIDsWriter.MustClose()
 
        for _, w := range sw.tagFamilyMetadataWriters {
                w.MustClose()
@@ -169,7 +166,6 @@ func (bw *blockWriter) MustInitForMemPart(mp *memPart) {
        bw.writers.metaWriter.init(&mp.meta)
        bw.writers.primaryWriter.init(&mp.primary)
        bw.writers.timestampsWriter.init(&mp.timestamps)
-       bw.writers.elementIDsWriter.init(&mp.elementIDs)
 }
 
 func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path 
string) {
@@ -182,7 +178,6 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem 
fs.FileSystem, path string
        bw.writers.metaWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, metaFilename), filePermission))
        bw.writers.primaryWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, primaryFilename), filePermission))
        bw.writers.timestampsWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, timestampsFilename), filePermission))
-       bw.writers.elementIDsWriter.init(fs.MustCreateFile(fileSystem, 
filepath.Join(path, elementIDsFilename), filePermission))
 }
 
 func (bw *blockWriter) MustWriteElements(sid common.SeriesID, timestamps 
[]int64, elementIDs []uint64, tagFamilies [][]tagValues) {
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index a834e2e7..b6ad05c7 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -37,7 +37,6 @@ const (
        primaryFilename                = "primary.bin"
        metaFilename                   = "meta.bin"
        timestampsFilename             = "timestamps.bin"
-       elementIDsFilename             = "elementIDs.bin"
        elementIndexFilename           = "idx"
        tagFamiliesMetadataFilenameExt = ".tfm"
        tagFamiliesFilenameExt         = ".tf"
@@ -46,7 +45,6 @@ const (
 type part struct {
        primary              fs.Reader
        timestamps           fs.Reader
-       elementIDs           fs.Reader
        fileSystem           fs.FileSystem
        tagFamilyMetadata    map[string]fs.Reader
        tagFamilies          map[string]fs.Reader
@@ -58,7 +56,6 @@ type part struct {
 func (p *part) close() {
        fs.MustClose(p.primary)
        fs.MustClose(p.timestamps)
-       fs.MustClose(p.elementIDs)
        for _, tf := range p.tagFamilies {
                fs.MustClose(tf)
        }
@@ -80,7 +77,6 @@ func openMemPart(mp *memPart) *part {
        // Open data files
        p.primary = &mp.primary
        p.timestamps = &mp.timestamps
-       p.elementIDs = &mp.elementIDs
        if mp.tagFamilies != nil {
                p.tagFamilies = make(map[string]fs.Reader)
                p.tagFamilyMetadata = make(map[string]fs.Reader)
@@ -98,7 +94,6 @@ type memPart struct {
        meta              bytes.Buffer
        primary           bytes.Buffer
        timestamps        bytes.Buffer
-       elementIDs        bytes.Buffer
        partMetadata      partMetadata
 }
 
@@ -124,7 +119,6 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.timestamps.Reset()
-       mp.elementIDs.Reset()
        if mp.tagFamilies != nil {
                for _, tf := range mp.tagFamilies {
                        tf.Reset()
@@ -176,7 +170,6 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path 
string) {
        fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(path, 
metaFilename), filePermission)
        fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(path, 
primaryFilename), filePermission)
        fs.MustFlush(fileSystem, mp.timestamps.Buf, filepath.Join(path, 
timestampsFilename), filePermission)
-       fs.MustFlush(fileSystem, mp.elementIDs.Buf, filepath.Join(path, 
elementIDsFilename), filePermission)
        for name, tf := range mp.tagFamilies {
                fs.MustFlush(fileSystem, tf.Buf, filepath.Join(path, 
name+tagFamiliesFilenameExt), filePermission)
        }
@@ -268,7 +261,6 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
 
        p.primary = mustOpenReader(path.Join(partPath, primaryFilename), 
fileSystem)
        p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), 
fileSystem)
-       p.elementIDs = mustOpenReader(path.Join(partPath, elementIDsFilename), 
fileSystem)
        ee := fileSystem.ReadDir(partPath)
        for _, e := range ee {
                if e.IsDir() {
diff --git a/banyand/stream/snapshot_test.go b/banyand/stream/snapshot_test.go
index 0d8b80ce..fc2f3654 100644
--- a/banyand/stream/snapshot_test.go
+++ b/banyand/stream/snapshot_test.go
@@ -390,7 +390,6 @@ func TestSnapshotRemove(t *testing.T) {
                                        {p: &part{
                                                partMetadata: partMetadata{ID: 
1},
                                                timestamps:   &bytes.Buffer{},
-                                               elementIDs:   &bytes.Buffer{},
                                                primary:      &bytes.Buffer{},
                                        }, ref: 1},
                                        {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index d3e901e4..60596f57 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -34,7 +34,6 @@ import (
 
 const (
        maxValuesBlockSize              = 8 * 1024 * 1024
-       maxTimestampsBlockSize          = 8 * 1024 * 1024
        maxElementIDsBlockSize          = 8 * 1024 * 1024
        maxTagFamiliesMetadataSize      = 8 * 1024 * 1024
        maxUncompressedBlockSize        = 2 * 1024 * 1024
diff --git a/pkg/encoding/int_list.go b/pkg/encoding/int_list.go
index 82b842c5..b0e68bc8 100644
--- a/pkg/encoding/int_list.go
+++ b/pkg/encoding/int_list.go
@@ -55,7 +55,7 @@ func Int64ListToBytes(dst []byte, a []int64) (result []byte, 
mt EncodeType, firs
 
 // BytesToInt64List decodes bytes into a list of int64.
 func BytesToInt64List(dst []int64, src []byte, mt EncodeType, firstValue 
int64, itemsCount int) ([]int64, error) {
-       dst = ExtendInt64ListCapacity(dst, itemsCount)
+       dst = ExtendListCapacity(dst, itemsCount)
 
        var err error
        switch mt {
@@ -100,11 +100,11 @@ func BytesToInt64List(dst []int64, src []byte, mt 
EncodeType, firstValue int64,
        }
 }
 
-// ExtendInt64ListCapacity extends the capacity of the int64 list.
-func ExtendInt64ListCapacity(dst []int64, additionalItems int) []int64 {
+// ExtendListCapacity extends the capacity of the given list.
+func ExtendListCapacity[T any](dst []T, additionalItems int) []T {
        dstLen := len(dst)
        if n := dstLen + additionalItems - cap(dst); n > 0 {
-               dst = append(dst[:cap(dst)], make([]int64, n)...)
+               dst = append(dst[:cap(dst)], make([]T, n)...)
        }
        return dst[:dstLen]
 }

Reply via email to