This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 16561def Refactor readData method in block.go (#782)
16561def is described below
commit 16561defde1d3d35ac73f60f298e9062f6d48329
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Sep 24 00:09:24 2025 +0800
Refactor readData method in block.go (#782)
---
banyand/internal/sidx/block.go | 17 ++--
banyand/internal/sidx/part.go | 14 +--
banyand/internal/sidx/query_result.go | 8 +-
banyand/internal/sidx/snapshot.go | 1 +
banyand/trace/flusher.go | 4 +-
banyand/trace/merger.go | 4 +-
banyand/trace/syncer.go | 105 ++++++++++++++++++---
banyand/trace/tstable.go | 20 +---
.../multi_segments/multi_segments_suite_test.go | 5 -
9 files changed, 110 insertions(+), 68 deletions(-)
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index 85bd370e..524fdb78 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -25,7 +25,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/pkg/bytes"
- "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -341,11 +340,9 @@ func mustWriteDataTo(db *dataBlock, data [][]byte,
dataWriter *writer) {
// Encode all data payloads as a block
bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], data)
- // Compress and write
- compressedData := zstd.Compress(nil, bb.Buf, 1)
db.offset = dataWriter.bytesWritten
- db.size = uint64(len(compressedData))
- dataWriter.MustWrite(compressedData)
+ db.size = uint64(len(bb.Buf))
+ dataWriter.MustWrite(bb.Buf)
}
type blockPointer struct {
@@ -536,13 +533,11 @@ func (b *block) readData(decoder
*encoding.BytesBlockDecoder, sr *seqReaders, bm
bb.Buf = bb.Buf[:0]
bigValuePool.Put(bb)
}()
- bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(bm.dataBlock.size))
+ bb.Buf = bytes.ResizeOver(bb.Buf, int(bm.dataBlock.size))
sr.data.mustReadFull(bb.Buf)
- dataBuf, err := zstd.Decompress(bb.Buf[:0], bb.Buf)
- if err != nil {
- return fmt.Errorf("cannot decompress data: %w", err)
- }
- b.data, err = decoder.Decode(b.data[:0], dataBuf, bm.count)
+
+ var err error
+ b.data, err = decoder.Decode(b.data[:0], bb.Buf, bm.count)
if err != nil {
return fmt.Errorf("cannot decode data payloads: %w", err)
}
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 6c7b12d9..94c6b7f7 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -297,7 +297,6 @@ func (p *part) readAll() ([]*elements, error) {
result := make([]*elements, 0, len(p.primaryBlockMetadata))
compressedPrimaryBuf := make([]byte, 0, 1024)
primaryBuf := make([]byte, 0, 1024)
- compressedDataBuf := make([]byte, 0, 1024)
dataBuf := make([]byte, 0, 1024)
compressedKeysBuf := make([]byte, 0, 1024)
@@ -348,17 +347,8 @@ func (p *part) readAll() ([]*elements, error) {
}
// Read data payloads
- compressedDataBuf = bytes.ResizeOver(compressedDataBuf,
int(bm.dataBlock.size))
- fs.MustReadData(p.data, int64(bm.dataBlock.offset),
compressedDataBuf)
-
- dataBuf, err = zstd.Decompress(dataBuf[:0],
compressedDataBuf)
- if err != nil {
- releaseElements(elems)
- for _, e := range result {
- releaseElements(e)
- }
- return nil, fmt.Errorf("cannot decompress data
block: %w", err)
- }
+ dataBuf = bytes.ResizeOver(dataBuf,
int(bm.dataBlock.size))
+ fs.MustReadData(p.data, int64(bm.dataBlock.offset),
dataBuf)
// Decode data payloads - create a new decoder for each
block to avoid state corruption
blockBytesDecoder := &encoding.BytesBlockDecoder{}
diff --git a/banyand/internal/sidx/query_result.go
b/banyand/internal/sidx/query_result.go
index d21c3b3c..09692a2f 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -24,7 +24,6 @@ import (
internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/bytes"
- "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -87,14 +86,9 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p
*part, bm *blockMetadata
bb2.Buf = bytes.ResizeOver(bb2.Buf[:0], int(bm.dataBlock.size))
fs.MustReadData(p.data, int64(bm.dataBlock.offset), bb2.Buf)
- dataBuf, err := zstd.Decompress(bb.Buf[:0], bb2.Buf)
- if err != nil {
- return false
- }
-
// Decode data payloads
decoder := &encoding.BytesBlockDecoder{}
- tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], dataBuf,
bm.count)
+ tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], bb2.Buf,
bm.count)
if err != nil {
return false
}
diff --git a/banyand/internal/sidx/snapshot.go
b/banyand/internal/sidx/snapshot.go
index ce7cf6a6..809c0763 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -338,6 +338,7 @@ func (s *snapshot) remove(epoch uint64, toRemove
map[uint64]struct{}) *snapshot
if _, shouldRemove := toRemove[pw.ID()]; !shouldRemove {
if pw.acquire() {
result.parts = append(result.parts, pw)
+ continue
}
}
pw.markForRemoval()
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 727cad6c..33eb4d1e 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -268,9 +268,9 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction)
return
}
allSidx := tst.getAllSidx()
- for _, sidxInstance := range allSidx {
+ for name, sidxInstance := range allSidx {
if err := sidxInstance.Flush(); err != nil {
- tst.l.Warn().Err(err).Msg("sidx flush failed")
+ tst.l.Warn().Err(err).Str("sidx", name).Msg("sidx flush
failed")
return
}
}
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 2702e639..d96fad6f 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -112,9 +112,9 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator
snapshotCreator, part
if err != nil {
return nil, err
}
- for sidxName, sidxInstance := range tst.sidxMap {
+ for _, sidxInstance := range tst.getAllSidx() {
if err := sidxInstance.Merge(closeCh); err != nil {
- tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx
merge failed")
+ tst.l.Warn().Err(err).Msg("sidx merge failed")
return nil, err
}
}
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index a95c9e8c..a1ef98e0 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -183,6 +183,29 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
if tst.loopCloser != nil && tst.loopCloser.Closed() {
return errClosed
}
+
+ // Collect parts to sync
+ partsToSync, sidxPartsToSync, err := tst.collectPartsToSync(curSnapshot)
+ if err != nil {
+ return err
+ }
+
+ // Validate sync preconditions
+ if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync);
err != nil {
+ return err
+ }
+
+ // Execute sync operation
+ if err := tst.executeSyncOperation(partsToSync, sidxPartsToSync); err
!= nil {
+ return err
+ }
+
+ // Handle sync introductions
+ return tst.handleSyncIntroductions(partsToSync, sidxPartsToSync, syncCh)
+}
+
+// collectPartsToSync collects both core and sidx parts that need to be
synchronized.
+func (tst *tsTable) collectPartsToSync(curSnapshot *snapshot) ([]*part,
map[string][]*sidx.Part, error) {
// Get all parts from the current snapshot
var partsToSync []*part
for _, pw := range curSnapshot.parts {
@@ -190,14 +213,20 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
partsToSync = append(partsToSync, pw.p)
}
}
+
sidxPartsToSync := sidx.NewPartsToSync()
- for name, sidx := range tst.sidxMap {
+ for name, sidx := range tst.getAllSidx() {
if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
+ return nil, nil, errClosed
}
sidxPartsToSync[name] = sidx.PartsToSync()
}
+ return partsToSync, sidxPartsToSync, nil
+}
+
+// validateSyncPreconditions validates that there are parts to sync and nodes
available.
+func (tst *tsTable) validateSyncPreconditions(partsToSync []*part,
sidxPartsToSync map[string][]*sidx.Part) error {
hasCoreParts := len(partsToSync) > 0
hasSidxParts := false
for _, parts := range sidxPartsToSync {
@@ -214,11 +243,16 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
if len(nodes) == 0 {
return fmt.Errorf("no nodes to sync parts")
}
+
+ return nil
+}
+
+// executeSyncOperation performs the actual synchronization of parts to nodes.
+func (tst *tsTable) executeSyncOperation(partsToSync []*part, sidxPartsToSync
map[string][]*sidx.Part) error {
sort.Slice(partsToSync, func(i, j int) bool {
return partsToSync[i].partMetadata.ID <
partsToSync[j].partMetadata.ID
})
- // Use chunked sync with streaming for better memory efficiency
ctx := context.Background()
releaseFuncs := make([]func(), 0, len(partsToSync))
defer func() {
@@ -226,11 +260,14 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
release()
}
}()
- if err := tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync,
sidxPartsToSync, &releaseFuncs); err != nil {
- return err
- }
- // Construct syncIntroduction to remove synced parts from snapshot
+ nodes := tst.getNodes()
+ return tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync,
sidxPartsToSync, &releaseFuncs)
+}
+
+// handleSyncIntroductions creates and processes sync introductions for both
core and sidx parts.
+func (tst *tsTable) handleSyncIntroductions(partsToSync []*part,
sidxPartsToSync map[string][]*sidx.Part, syncCh chan *syncIntroduction) error {
+ // Create core sync introduction
si := generateSyncIntroduction()
defer releaseSyncIntroduction(si)
si.applied = make(chan struct{})
@@ -241,7 +278,25 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
si.synced[ph] = struct{}{}
}
- // Construct sidx sync introductions for each sidx instance
+
+ // Create sidx sync introductions
+ sidxSyncIntroductions, err :=
tst.createSidxSyncIntroductions(sidxPartsToSync)
+ if err != nil {
+ return err
+ }
+ defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
+
+ // Send sync introductions
+ if err := tst.sendSyncIntroductions(si, sidxSyncIntroductions, syncCh);
err != nil {
+ return err
+ }
+
+ // Wait for sync introductions to be applied
+ return tst.waitForSyncIntroductions(si, sidxSyncIntroductions)
+}
+
+// createSidxSyncIntroductions creates sync introductions for sidx parts.
+func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync
map[string][]*sidx.Part) (map[string]*sidx.SyncIntroduction, error) {
sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
for name, sidxParts := range sidxPartsToSync {
if len(sidxParts) > 0 {
@@ -253,32 +308,52 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
sidxSyncIntroductions[name] = ssi
}
}
- // Send sync introductions
+ return sidxSyncIntroductions, nil
+}
+
+// releaseSidxSyncIntroductions releases sidx sync introductions.
+func (tst *tsTable) releaseSidxSyncIntroductions(sidxSyncIntroductions
map[string]*sidx.SyncIntroduction) {
+ for _, ssi := range sidxSyncIntroductions {
+ sidx.ReleaseSyncIntroduction(ssi)
+ }
+}
+
+// sendSyncIntroductions sends sync introductions to their respective channels.
+func (tst *tsTable) sendSyncIntroductions(si *syncIntroduction,
sidxSyncIntroductions map[string]*sidx.SyncIntroduction, syncCh chan
*syncIntroduction) error {
select {
case syncCh <- si:
case <-tst.loopCloser.CloseNotify():
return errClosed
}
+
for name, ssi := range sidxSyncIntroductions {
+ sidx, ok := tst.getSidx(name)
+ if !ok {
+ return fmt.Errorf("sidx %s not found", name)
+ }
select {
- case tst.sidxMap[name].SyncCh() <- ssi:
+ case sidx.SyncCh() <- ssi:
case <-tst.loopCloser.CloseNotify():
return errClosed
}
}
- // Wait for sync introductions to be applied
+ return nil
+}
+
+// waitForSyncIntroductions waits for all sync introductions to be applied.
+func (tst *tsTable) waitForSyncIntroductions(si *syncIntroduction,
sidxSyncIntroductions map[string]*sidx.SyncIntroduction) error {
select {
case <-si.applied:
case <-tst.loopCloser.CloseNotify():
return errClosed
}
+
for _, ssi := range sidxSyncIntroductions {
select {
case <-ssi.Applied:
case <-tst.loopCloser.CloseNotify():
return errClosed
}
- sidx.ReleaseSyncIntroduction(ssi)
}
return nil
}
@@ -298,7 +373,11 @@ func (tst *tsTable) syncStreamingPartsToNodes(ctx
context.Context, nodes []strin
streamingParts := make([]queue.StreamingPartData, 0)
// Add sidx streaming parts
for name, sidxParts := range sidxPartsToSync {
- sidxStreamingParts, sidxReleaseFuncs :=
tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID),
name)
+ sidx, ok := tst.getSidx(name)
+ if !ok {
+ return fmt.Errorf("sidx %s not found", name)
+ }
+ sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name)
streamingParts = append(streamingParts,
sidxStreamingParts...)
*releaseFuncs = append(*releaseFuncs,
sidxReleaseFuncs...)
}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 645dde18..86453916 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -306,25 +306,13 @@ func (tst *tsTable) getOrCreateSidx(name string)
(sidx.SIDX, error) {
}
// getAllSidx returns all sidx instances for potential multi-sidx queries.
-func (tst *tsTable) getAllSidx() []sidx.SIDX {
+func (tst *tsTable) getAllSidx() map[string]sidx.SIDX {
tst.RLock()
defer tst.RUnlock()
- result := make([]sidx.SIDX, 0, len(tst.sidxMap))
- for _, sidxInstance := range tst.sidxMap {
- result = append(result, sidxInstance)
- }
- return result
-}
-
-// getAllSidxNames returns all sidx names.
-func (tst *tsTable) getAllSidxNames() []string {
- tst.RLock()
- defer tst.RUnlock()
-
- result := make([]string, 0, len(tst.sidxMap))
- for sidxName := range tst.sidxMap {
- result = append(result, sidxName)
+ result := make(map[string]sidx.SIDX, len(tst.sidxMap))
+ for name, sidxInstance := range tst.sidxMap {
+ result[name] = sidxInstance
}
return result
}
diff --git
a/test/integration/distributed/multi_segments/multi_segments_suite_test.go
b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
index 5a8fa9fe..e331f6e6 100644
--- a/test/integration/distributed/multi_segments/multi_segments_suite_test.go
+++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
@@ -48,7 +48,6 @@ import (
casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
- casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
)
func TestDistributedMultiSegments(t *testing.T) {
@@ -137,10 +136,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Connection: connection,
BaseTime: baseTime,
}
- casestrace.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: baseTime,
- }
Expect(err).NotTo(HaveOccurred())
})