This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/sidx-bloom
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9a808b8e1ed867d4bbc1533906fcfcaaf30be157
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 26 22:12:42 2025 +0800

    Refactor and clean up trace and sidx modules
    
    - Removed unused functions and variables from the sidx and trace packages, 
including metadata retrieval and tag handling methods.
    - Simplified the handling of trace file chunks by removing error returns 
for certain operations, streamlining the code.
    - Updated the golangci configuration to remove deprecated linter paths.
    - Deleted the obsolete trace suite test file to clean up the codebase.
---
 .golangci.yml                                  |   9 --
 banyand/internal/sidx/multi_sidx_query_test.go |   1 -
 banyand/internal/sidx/part.go                  |  37 --------
 banyand/internal/sidx/snapshot.go              |  27 ------
 banyand/internal/sidx/tag.go                   |  26 ------
 banyand/trace/block.go                         |   4 -
 banyand/trace/bloom_filter.go                  |  16 ----
 banyand/trace/metrics.go                       |   5 +-
 banyand/trace/svc_standalone.go                |   3 +
 banyand/trace/syncer.go                        |   9 +-
 banyand/trace/tag.go                           |   3 +-
 banyand/trace/trace_suite_test.go              | 113 -------------------------
 banyand/trace/traces.go                        |  58 ++-----------
 banyand/trace/write_data.go                    |  14 +--
 14 files changed, 18 insertions(+), 307 deletions(-)

diff --git a/.golangci.yml b/.golangci.yml
index eadf4197..da1c435a 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -152,13 +152,4 @@ issues:
   - linters:
     - staticcheck
     text: "SA1019: package github.com/golang/protobuf"
-  # TODO: remove this after the trace is done
-  - path: "^trace/"
-    linters:
-    - unused
-    - unparam
-  # TODO: remove this after the sidx is done
-  - path: "internal/sidx/"
-    linters:
-    - unused
   max-same-issues: 0
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go 
b/banyand/internal/sidx/multi_sidx_query_test.go
index b15553b7..155c06bf 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -35,7 +35,6 @@ type mockSIDX struct {
        err      error
        response *QueryResponse
        name     string
-       delay    bool
 }
 
 func (m *mockSIDX) MustAddMemPart(_ context.Context, _ *memPart) {}
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 94c6b7f7..5b02efbb 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -458,11 +458,6 @@ func (p *part) String() string {
        return fmt.Sprintf("sidx part at %s", p.path)
 }
 
-// getPartMetadata returns the part metadata.
-func (p *part) getPartMetadata() *partMetadata {
-       return p.partMetadata
-}
-
 // getTagDataReader returns the tag data reader for the specified tag name.
 func (p *part) getTagDataReader(tagName string) (fs.Reader, bool) {
        reader, exists := p.tagData[tagName]
@@ -481,38 +476,6 @@ func (p *part) getTagFilterReader(tagName string) 
(fs.Reader, bool) {
        return reader, exists
 }
 
-// getAvailableTagNames returns a slice of all available tag names in this 
part.
-func (p *part) getAvailableTagNames() []string {
-       tagNames := make(map[string]struct{})
-
-       // Collect tag names from all tag file types.
-       for tagName := range p.tagData {
-               tagNames[tagName] = struct{}{}
-       }
-       for tagName := range p.tagMetadata {
-               tagNames[tagName] = struct{}{}
-       }
-       for tagName := range p.tagFilters {
-               tagNames[tagName] = struct{}{}
-       }
-
-       // Convert to slice.
-       result := make([]string, 0, len(tagNames))
-       for tagName := range tagNames {
-               result = append(result, tagName)
-       }
-
-       return result
-}
-
-// hasTagFiles returns true if the part has any tag files for the specified 
tag name.
-func (p *part) hasTagFiles(tagName string) bool {
-       _, hasData := p.tagData[tagName]
-       _, hasMeta := p.tagMetadata[tagName]
-       _, hasFilter := p.tagFilters[tagName]
-       return hasData || hasMeta || hasFilter
-}
-
 // Path returns the part's directory path.
 func (p *part) Path() string {
        return p.path
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index 809c0763..c16d9e30 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -21,7 +21,6 @@ import (
        "encoding/json"
        "fmt"
        "path/filepath"
-       "sort"
        "strconv"
        "sync/atomic"
 
@@ -186,32 +185,6 @@ func (s *snapshot) validate() error {
        return nil
 }
 
-// sortPartsByEpoch sorts parts by their epoch (ID), oldest first.
-// This ensures consistent iteration order during queries.
-func (s *snapshot) sortPartsByEpoch() {
-       sort.Slice(s.parts, func(i, j int) bool {
-               partI := s.parts[i].p
-               partJ := s.parts[j].p
-
-               if partI == nil || partI.partMetadata == nil {
-                       return false
-               }
-               if partJ == nil || partJ.partMetadata == nil {
-                       return true
-               }
-
-               return partI.partMetadata.ID < partJ.partMetadata.ID
-       })
-}
-
-// copyParts creates a copy of the parts slice for safe iteration.
-// The caller should acquire references to parts they intend to use.
-func (s *snapshot) copyParts() []*partWrapper {
-       result := make([]*partWrapper, len(s.parts))
-       copy(result, s.parts)
-       return result
-}
-
 // addPart adds a new part to the snapshot during construction.
 // This should only be called before the snapshot is made available to other 
goroutines.
 // After construction, snapshots should be treated as immutable.
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index 2db4ec53..d02ebf24 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -18,7 +18,6 @@
 package sidx
 
 import (
-       "bytes"
        "fmt"
 
        pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -217,31 +216,6 @@ func (td *tagData) updateMinMax() {
        }
 }
 
-// addValue adds a value to the tag data.
-func (td *tagData) addValue(value []byte) {
-       td.values = append(td.values, value)
-
-       // Update filter for indexed tags
-       if td.filter != nil {
-               td.filter.Add(value)
-       }
-}
-
-// hasValue checks if a value exists in the tag using the bloom filter.
-func (td *tagData) hasValue(value []byte) bool {
-       if td.filter == nil {
-               // For non-indexed tags, do linear search
-               for _, v := range td.values {
-                       if bytes.Equal(v, value) {
-                               return true
-                       }
-               }
-               return false
-       }
-
-       return td.filter.MightContain(value)
-}
-
 // marshal serializes tag metadata to bytes using encoding package.
 func (tm *tagMetadata) marshal(dst []byte) []byte {
        dst = pkgencoding.EncodeBytes(dst, []byte(tm.name))
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index ca6c4073..9ba12bee 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -580,10 +580,6 @@ func assertIdxAndOffset(name string, length int, idx int, 
offset int) {
        }
 }
 
-func (bi *blockPointer) isFull() bool {
-       return bi.bm.uncompressedSpanSizeBytes >= maxUncompressedSpanSize
-}
-
 func (bi *blockPointer) reset() {
        bi.idx = 0
        bi.block.reset()
diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go
index 6e8efe5f..67e182b7 100644
--- a/banyand/trace/bloom_filter.go
+++ b/banyand/trace/bloom_filter.go
@@ -21,7 +21,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/filter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
@@ -44,18 +43,3 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter) 
*filter.BloomFilter {
 
        return bf
 }
-
-func generateBloomFilter() *filter.BloomFilter {
-       v := bloomFilterPool.Get()
-       if v == nil {
-               return filter.NewBloomFilter(0)
-       }
-       return v
-}
-
-func releaseBloomFilter(bf *filter.BloomFilter) {
-       bf.Reset()
-       bloomFilterPool.Put(bf)
-}
-
-var bloomFilterPool = pool.Register[*filter.BloomFilter]("trace-bloomFilter")
diff --git a/banyand/trace/metrics.go b/banyand/trace/metrics.go
index 2402e308..e71514f0 100644
--- a/banyand/trace/metrics.go
+++ b/banyand/trace/metrics.go
@@ -26,9 +26,8 @@ import (
 )
 
 var (
-       streamScope  = observability.RootScope.SubScope("stream")
-       tbScope      = streamScope.SubScope("tst")
-       storageScope = streamScope.SubScope("storage")
+       streamScope = observability.RootScope.SubScope("stream")
+       tbScope     = streamScope.SubScope("tst")
 )
 
 type metrics struct {
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 3484605e..dfb42242 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -160,6 +160,9 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if err != nil {
                return err
        }
+       if err = s.pipeline.Subscribe(data.TopicSnapshot, &snapshotListener{s: 
s}); err != nil {
+               return err
+       }
 
        s.localPipeline = queue.Local()
        err = s.localPipeline.Subscribe(data.TopicTraceWrite, writeListener)
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index ba95c470..8e98a3e0 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -294,10 +294,7 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync 
[]*part, sidxPartsToSync
        }
 
        // Create sidx sync introductions
-       sidxSyncIntroductions, err := 
tst.createSidxSyncIntroductions(sidxPartsToSync)
-       if err != nil {
-               return err
-       }
+       sidxSyncIntroductions := 
tst.createSidxSyncIntroductions(sidxPartsToSync)
        defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
 
        // Send sync introductions
@@ -310,7 +307,7 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync 
[]*part, sidxPartsToSync
 }
 
 // createSidxSyncIntroductions creates sync introductions for sidx parts.
-func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync 
map[string][]*sidx.Part) (map[string]*sidx.SyncIntroduction, error) {
+func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync 
map[string][]*sidx.Part) map[string]*sidx.SyncIntroduction {
        sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
        for name, sidxParts := range sidxPartsToSync {
                if len(sidxParts) > 0 {
@@ -322,7 +319,7 @@ func (tst *tsTable) 
createSidxSyncIntroductions(sidxPartsToSync map[string][]*si
                        sidxSyncIntroductions[name] = ssi
                }
        }
-       return sidxSyncIntroductions, nil
+       return sidxSyncIntroductions
 }
 
 // releaseSidxSyncIntroductions releases sidx sync introductions.
diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go
index bd576ece..c8c0daaf 100644
--- a/banyand/trace/tag.go
+++ b/banyand/trace/tag.go
@@ -42,14 +42,13 @@ func (t *tag) reset() {
        t.values = values[:0]
 }
 
-func (t *tag) resizeValues(valuesLen int) [][]byte {
+func (t *tag) resizeValues(valuesLen int) {
        values := t.values
        if n := valuesLen - cap(values); n > 0 {
                values = append(values[:cap(values)], make([][]byte, n)...)
        }
        values = values[:valuesLen]
        t.values = values
-       return values
 }
 
 func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) {
diff --git a/banyand/trace/trace_suite_test.go 
b/banyand/trace/trace_suite_test.go
deleted file mode 100644
index 05140094..00000000
--- a/banyand/trace/trace_suite_test.go
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package trace_test
-
-import (
-       "context"
-       "testing"
-
-       g "github.com/onsi/ginkgo/v2"
-       "github.com/onsi/gomega"
-
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver"
-       "github.com/apache/skywalking-banyandb/banyand/observability"
-       "github.com/apache/skywalking-banyandb/banyand/protector"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/banyand/trace"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/test"
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
-       testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
-)
-
-func TestTrace(t *testing.T) {
-       gomega.RegisterFailHandler(g.Fail)
-       g.RunSpecs(t, "Trace Suite")
-}
-
-var _ = g.BeforeSuite(func() {
-       gomega.Expect(logger.Init(logger.Logging{
-               Env:   "dev",
-               Level: flags.LogLevel,
-       })).To(gomega.Succeed())
-})
-
-type preloadTraceService struct {
-       metaSvc metadata.Service
-}
-
-func (p *preloadTraceService) Name() string {
-       return "preload-trace"
-}
-
-func (p *preloadTraceService) PreRun(ctx context.Context) error {
-       return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
-}
-
-type services struct {
-       trace           trace.Service
-       metadataService metadata.Service
-       pipeline        queue.Queue
-}
-
-func setUp() (*services, func()) {
-       // Init Pipeline
-       pipeline := queue.Local()
-
-       // Init Metadata Service
-       metadataService, err := embeddedserver.NewService(context.TODO())
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-
-       metricSvc := observability.NewMetricService(metadataService, pipeline, 
"test", nil)
-       pm := protector.NewMemory(metricSvc)
-       // Init Trace Service
-       traceService, err := trace.NewService(metadataService, pipeline, 
metricSvc, pm)
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       preloadTraceSvc := &preloadTraceService{metaSvc: metadataService}
-       // querySvc, err := query.NewService(context.TODO(), traceService, nil, 
metadataService, pipeline)
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       var flags []string
-       metaPath, metaDeferFunc, err := test.NewSpace()
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       flags = append(flags, "--metadata-root-path="+metaPath)
-       rootPath, deferFunc, err := test.NewSpace()
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       flags = append(flags, "--trace-root-path="+rootPath)
-       listenClientURL, listenPeerURL, err := test.NewEtcdListenUrls()
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       flags = append(flags, "--etcd-listen-client-url="+listenClientURL, 
"--etcd-listen-peer-url="+listenPeerURL)
-       moduleDeferFunc := test.SetupModules(
-               flags,
-               pipeline,
-               metadataService,
-               preloadTraceSvc,
-               traceService,
-               // querySvc,
-       )
-       return &services{
-                       trace:           traceService,
-                       metadataService: metadataService,
-                       pipeline:        pipeline,
-               }, func() {
-                       moduleDeferFunc()
-                       metaDeferFunc()
-                       deferFunc()
-               }
-}
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index bf92d649..b8129008 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -18,11 +18,10 @@
 package trace
 
 import (
-       "bytes"
-
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/internal/wqueue"
@@ -45,19 +44,6 @@ func (t *tagValue) reset() {
        t.valueArr = nil
 }
 
-func (t *tagValue) size() int {
-       s := len(t.tag)
-       if t.value != nil {
-               s += len(t.value)
-       }
-       if t.valueArr != nil {
-               for i := range t.valueArr {
-                       s += len(t.valueArr[i])
-               }
-       }
-       return s
-}
-
 func (t *tagValue) marshal() []byte {
        if t.valueArr != nil {
                var dst []byte
@@ -66,7 +52,7 @@ func (t *tagValue) marshal() []byte {
                                dst = append(dst, t.valueArr[i]...)
                                continue
                        }
-                       dst = marshalVarArray(dst, t.valueArr[i])
+                       dst = encoding.MarshalVarArray(dst, t.valueArr[i])
                }
                return dst
        }
@@ -88,43 +74,22 @@ func releaseTagValue(v *tagValue) {
 
 var tagValuePool = pool.Register[*tagValue]("trace-tagValue")
 
-const (
-       entityDelimiter = '|'
-       escape          = '\\'
-)
-
-func marshalVarArray(dest, src []byte) []byte {
-       if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, 
escape) < 0 {
-               dest = append(dest, src...)
-               dest = append(dest, entityDelimiter)
-               return dest
-       }
-       for _, b := range src {
-               if b == entityDelimiter || b == escape {
-                       dest = append(dest, escape)
-               }
-               dest = append(dest, b)
-       }
-       dest = append(dest, entityDelimiter)
-       return dest
-}
-
 func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
        if len(src) == 0 {
                return nil, nil, errors.New("empty entity value")
        }
-       if src[0] == entityDelimiter {
+       if src[0] == encoding.EntityDelimiter {
                return dest, src[1:], nil
        }
        for len(src) > 0 {
                switch {
-               case src[0] == escape:
+               case src[0] == encoding.Escape:
                        if len(src) < 2 {
                                return nil, nil, errors.New("invalid escape 
character")
                        }
                        src = src[1:]
                        dest = append(dest, src[0])
-               case src[0] == entityDelimiter:
+               case src[0] == encoding.EntityDelimiter:
                        return dest, src[1:], nil
                default:
                        dest = append(dest, src[0])
@@ -134,19 +99,6 @@ func unmarshalVarArray(dest, src []byte) ([]byte, []byte, 
error) {
        return nil, nil, errors.New("invalid variable array")
 }
 
-type tagValues struct {
-       tag    string
-       values []*tagValue
-}
-
-func (t *tagValues) reset() {
-       t.tag = ""
-       for i := range t.values {
-               releaseTagValue(t.values[i])
-       }
-       t.values = t.values[:0]
-}
-
 type traces struct {
        traceIDs   []string
        timestamps []int64
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 9873b4d8..ed3c74dc 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -212,15 +212,11 @@ func (s *syncCallback) handleTraceFileChunk(ctx 
*queue.ChunkedSyncPartContext, c
                partCtx.writers.spanWriter.MustWrite(chunk)
        case fileName == traceIDFilterFilename:
                if partCtx.memPart != nil {
-                       if err := s.handleTraceIDFilterChunk(partCtx, chunk); 
err != nil {
-                               return fmt.Errorf("failed to handle traceID 
filter chunk: %w", err)
-                       }
+                       s.handleTraceIDFilterChunk(partCtx, chunk)
                }
        case fileName == tagTypeFilename:
                if partCtx.memPart != nil {
-                       if err := s.handleTagTypeChunk(partCtx, chunk); err != 
nil {
-                               return fmt.Errorf("failed to handle tag type 
chunk: %w", err)
-                       }
+                       s.handleTagTypeChunk(partCtx, chunk)
                }
        case strings.HasPrefix(fileName, traceTagsPrefix):
                tagName := fileName[len(traceTagsPrefix):]
@@ -237,12 +233,10 @@ func (s *syncCallback) handleTraceFileChunk(ctx 
*queue.ChunkedSyncPartContext, c
        return nil
 }
 
-func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext, 
chunk []byte) error {
+func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext, 
chunk []byte) {
        partCtx.traceIDFilterBuffer = append(partCtx.traceIDFilterBuffer, 
chunk...)
-       return nil
 }
 
-func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk 
[]byte) error {
+func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk 
[]byte) {
        partCtx.tagTypeBuffer = append(partCtx.tagTypeBuffer, chunk...)
-       return nil
 }

Reply via email to