This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/sidx-bloom in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 9a808b8e1ed867d4bbc1533906fcfcaaf30be157 Author: Gao Hongtao <[email protected]> AuthorDate: Fri Sep 26 22:12:42 2025 +0800 Refactor and clean up trace and sidx modules - Removed unused functions and variables from the sidx and trace packages, including metadata retrieval and tag handling methods. - Simplified the handling of trace file chunks by removing error returns for certain operations, streamlining the code. - Updated the golangci configuration to remove deprecated linter paths. - Deleted the obsolete trace suite test file to clean up the codebase. --- .golangci.yml | 9 -- banyand/internal/sidx/multi_sidx_query_test.go | 1 - banyand/internal/sidx/part.go | 37 -------- banyand/internal/sidx/snapshot.go | 27 ------ banyand/internal/sidx/tag.go | 26 ------ banyand/trace/block.go | 4 - banyand/trace/bloom_filter.go | 16 ---- banyand/trace/metrics.go | 5 +- banyand/trace/svc_standalone.go | 3 + banyand/trace/syncer.go | 9 +- banyand/trace/tag.go | 3 +- banyand/trace/trace_suite_test.go | 113 ------------------------- banyand/trace/traces.go | 58 ++----------- banyand/trace/write_data.go | 14 +-- 14 files changed, 18 insertions(+), 307 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index eadf4197..da1c435a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -152,13 +152,4 @@ issues: - linters: - staticcheck text: "SA1019: package github.com/golang/protobuf" - # TODO: remove this after the trace is done - - path: "^trace/" - linters: - - unused - - unparam - # TODO: remove this after the sidx is done - - path: "internal/sidx/" - linters: - - unused max-same-issues: 0 diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go index b15553b7..155c06bf 100644 --- a/banyand/internal/sidx/multi_sidx_query_test.go +++ b/banyand/internal/sidx/multi_sidx_query_test.go @@ -35,7 +35,6 @@ type mockSIDX struct { err error response *QueryResponse name string - delay bool } func (m *mockSIDX) MustAddMemPart(_ context.Context, _ *memPart) {} diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go index 94c6b7f7..5b02efbb 100644 --- a/banyand/internal/sidx/part.go +++ b/banyand/internal/sidx/part.go @@ -458,11 +458,6 @@ func (p *part) String() string { return fmt.Sprintf("sidx part at %s", p.path) } -// getPartMetadata returns the part metadata. -func (p *part) getPartMetadata() *partMetadata { - return p.partMetadata -} - // getTagDataReader returns the tag data reader for the specified tag name. func (p *part) getTagDataReader(tagName string) (fs.Reader, bool) { reader, exists := p.tagData[tagName] @@ -481,38 +476,6 @@ func (p *part) getTagFilterReader(tagName string) (fs.Reader, bool) { return reader, exists } -// getAvailableTagNames returns a slice of all available tag names in this part. -func (p *part) getAvailableTagNames() []string { - tagNames := make(map[string]struct{}) - - // Collect tag names from all tag file types. - for tagName := range p.tagData { - tagNames[tagName] = struct{}{} - } - for tagName := range p.tagMetadata { - tagNames[tagName] = struct{}{} - } - for tagName := range p.tagFilters { - tagNames[tagName] = struct{}{} - } - - // Convert to slice. - result := make([]string, 0, len(tagNames)) - for tagName := range tagNames { - result = append(result, tagName) - } - - return result -} - -// hasTagFiles returns true if the part has any tag files for the specified tag name. -func (p *part) hasTagFiles(tagName string) bool { - _, hasData := p.tagData[tagName] - _, hasMeta := p.tagMetadata[tagName] - _, hasFilter := p.tagFilters[tagName] - return hasData || hasMeta || hasFilter -} - // Path returns the part's directory path. func (p *part) Path() string { return p.path diff --git a/banyand/internal/sidx/snapshot.go b/banyand/internal/sidx/snapshot.go index 809c0763..c16d9e30 100644 --- a/banyand/internal/sidx/snapshot.go +++ b/banyand/internal/sidx/snapshot.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "path/filepath" - "sort" "strconv" "sync/atomic" @@ -186,32 +185,6 @@ func (s *snapshot) validate() error { return nil } -// sortPartsByEpoch sorts parts by their epoch (ID), oldest first. -// This ensures consistent iteration order during queries. -func (s *snapshot) sortPartsByEpoch() { - sort.Slice(s.parts, func(i, j int) bool { - partI := s.parts[i].p - partJ := s.parts[j].p - - if partI == nil || partI.partMetadata == nil { - return false - } - if partJ == nil || partJ.partMetadata == nil { - return true - } - - return partI.partMetadata.ID < partJ.partMetadata.ID - }) -} - -// copyParts creates a copy of the parts slice for safe iteration. -// The caller should acquire references to parts they intend to use. -func (s *snapshot) copyParts() []*partWrapper { - result := make([]*partWrapper, len(s.parts)) - copy(result, s.parts) - return result -} - // addPart adds a new part to the snapshot during construction. // This should only be called before the snapshot is made available to other goroutines. // After construction, snapshots should be treated as immutable. diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go index 2db4ec53..d02ebf24 100644 --- a/banyand/internal/sidx/tag.go +++ b/banyand/internal/sidx/tag.go @@ -18,7 +18,6 @@ package sidx import ( - "bytes" "fmt" pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -217,31 +216,6 @@ func (td *tagData) updateMinMax() { } } -// addValue adds a value to the tag data. -func (td *tagData) addValue(value []byte) { - td.values = append(td.values, value) - - // Update filter for indexed tags - if td.filter != nil { - td.filter.Add(value) - } -} - -// hasValue checks if a value exists in the tag using the bloom filter. -func (td *tagData) hasValue(value []byte) bool { - if td.filter == nil { - // For non-indexed tags, do linear search - for _, v := range td.values { - if bytes.Equal(v, value) { - return true - } - } - return false - } - - return td.filter.MightContain(value) -} - // marshal serializes tag metadata to bytes using encoding package. func (tm *tagMetadata) marshal(dst []byte) []byte { dst = pkgencoding.EncodeBytes(dst, []byte(tm.name)) diff --git a/banyand/trace/block.go b/banyand/trace/block.go index ca6c4073..9ba12bee 100644 --- a/banyand/trace/block.go +++ b/banyand/trace/block.go @@ -580,10 +580,6 @@ func assertIdxAndOffset(name string, length int, idx int, offset int) { } } -func (bi *blockPointer) isFull() bool { - return bi.bm.uncompressedSpanSizeBytes >= maxUncompressedSpanSize -} - func (bi *blockPointer) reset() { bi.idx = 0 bi.block.reset() diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go index 6e8efe5f..67e182b7 100644 --- a/banyand/trace/bloom_filter.go +++ b/banyand/trace/bloom_filter.go @@ -21,7 +21,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/filter" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/pool" ) func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte { @@ -44,18 +43,3 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter) *filter.BloomFilter { return bf } - -func generateBloomFilter() *filter.BloomFilter { - v := bloomFilterPool.Get() - if v == nil { - return filter.NewBloomFilter(0) - } - return v -} - -func releaseBloomFilter(bf *filter.BloomFilter) { - bf.Reset() - bloomFilterPool.Put(bf) -} - -var bloomFilterPool = pool.Register[*filter.BloomFilter]("trace-bloomFilter") diff --git a/banyand/trace/metrics.go b/banyand/trace/metrics.go index 2402e308..e71514f0 100644 --- a/banyand/trace/metrics.go +++ b/banyand/trace/metrics.go @@ -26,9 +26,8 @@ import ( ) var ( - streamScope = observability.RootScope.SubScope("stream") - tbScope = streamScope.SubScope("tst") - storageScope = streamScope.SubScope("storage") + streamScope = observability.RootScope.SubScope("stream") + tbScope = streamScope.SubScope("tst") ) type metrics struct { diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go index 3484605e..dfb42242 100644 --- a/banyand/trace/svc_standalone.go +++ b/banyand/trace/svc_standalone.go @@ -160,6 +160,9 @@ func (s *standalone) PreRun(ctx context.Context) error { if err != nil { return err } + if err = s.pipeline.Subscribe(data.TopicSnapshot, &snapshotListener{s: s}); err != nil { + return err + } s.localPipeline = queue.Local() err = s.localPipeline.Subscribe(data.TopicTraceWrite, writeListener) diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go index ba95c470..8e98a3e0 100644 --- a/banyand/trace/syncer.go +++ b/banyand/trace/syncer.go @@ -294,10 +294,7 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, sidxPartsToSync } // Create sidx sync introductions - sidxSyncIntroductions, err := tst.createSidxSyncIntroductions(sidxPartsToSync) - if err != nil { - return err - } + sidxSyncIntroductions := tst.createSidxSyncIntroductions(sidxPartsToSync) defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions) // Send sync introductions @@ -310,7 +307,7 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, sidxPartsToSync } // createSidxSyncIntroductions creates sync introductions for sidx parts. -func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync map[string][]*sidx.Part) (map[string]*sidx.SyncIntroduction, error) { +func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync map[string][]*sidx.Part) map[string]*sidx.SyncIntroduction { sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction) for name, sidxParts := range sidxPartsToSync { if len(sidxParts) > 0 { @@ -322,7 +319,7 @@ func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync map[string][]*si sidxSyncIntroductions[name] = ssi } } - return sidxSyncIntroductions, nil + return sidxSyncIntroductions } // releaseSidxSyncIntroductions releases sidx sync introductions. diff --git a/banyand/trace/tag.go b/banyand/trace/tag.go index bd576ece..c8c0daaf 100644 --- a/banyand/trace/tag.go +++ b/banyand/trace/tag.go @@ -42,14 +42,13 @@ func (t *tag) reset() { t.values = values[:0] } -func (t *tag) resizeValues(valuesLen int) [][]byte { +func (t *tag) resizeValues(valuesLen int) { values := t.values if n := valuesLen - cap(values); n > 0 { values = append(values[:cap(values)], make([][]byte, n)...) } values = values[:valuesLen] t.values = values - return values } func (t *tag) mustWriteTo(tm *tagMetadata, tagWriter *writer) { diff --git a/banyand/trace/trace_suite_test.go b/banyand/trace/trace_suite_test.go deleted file mode 100644 index 05140094..00000000 --- a/banyand/trace/trace_suite_test.go +++ /dev/null @@ -1,113 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package trace_test - -import ( - "context" - "testing" - - g "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" - - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver" - "github.com/apache/skywalking-banyandb/banyand/observability" - "github.com/apache/skywalking-banyandb/banyand/protector" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/banyand/trace" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/test" - "github.com/apache/skywalking-banyandb/pkg/test/flags" - testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace" -) - -func TestTrace(t *testing.T) { - gomega.RegisterFailHandler(g.Fail) - g.RunSpecs(t, "Trace Suite") -} - -var _ = g.BeforeSuite(func() { - gomega.Expect(logger.Init(logger.Logging{ - Env: "dev", - Level: flags.LogLevel, - })).To(gomega.Succeed()) -}) - -type preloadTraceService struct { - metaSvc metadata.Service -} - -func (p *preloadTraceService) Name() string { - return "preload-trace" -} - -func (p *preloadTraceService) PreRun(ctx context.Context) error { - return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) -} - -type services struct { - trace trace.Service - metadataService metadata.Service - pipeline queue.Queue -} - -func setUp() (*services, func()) { - // Init Pipeline - pipeline := queue.Local() - - // Init Metadata Service - metadataService, err := embeddedserver.NewService(context.TODO()) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - - metricSvc := observability.NewMetricService(metadataService, pipeline, "test", nil) - pm := protector.NewMemory(metricSvc) - // Init Trace Service - traceService, err := trace.NewService(metadataService, pipeline, metricSvc, pm) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - preloadTraceSvc := &preloadTraceService{metaSvc: metadataService} - // querySvc, err := query.NewService(context.TODO(), traceService, nil, metadataService, pipeline) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - var flags []string - metaPath, metaDeferFunc, err := test.NewSpace() - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - flags = append(flags, "--metadata-root-path="+metaPath) - rootPath, deferFunc, err := test.NewSpace() - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - flags = append(flags, "--trace-root-path="+rootPath) - listenClientURL, listenPeerURL, err := test.NewEtcdListenUrls() - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - flags = append(flags, "--etcd-listen-client-url="+listenClientURL, "--etcd-listen-peer-url="+listenPeerURL) - moduleDeferFunc := test.SetupModules( - flags, - pipeline, - metadataService, - preloadTraceSvc, - traceService, - // querySvc, - ) - return &services{ - trace: traceService, - metadataService: metadataService, - pipeline: pipeline, - }, func() { - moduleDeferFunc() - metaDeferFunc() - deferFunc() - } -} diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go index bf92d649..b8129008 100644 --- a/banyand/trace/traces.go +++ b/banyand/trace/traces.go @@ -18,11 +18,10 @@ package trace import ( - "bytes" - "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/internal/wqueue" @@ -45,19 +44,6 @@ func (t *tagValue) reset() { t.valueArr = nil } -func (t *tagValue) size() int { - s := len(t.tag) - if t.value != nil { - s += len(t.value) - } - if t.valueArr != nil { - for i := range t.valueArr { - s += len(t.valueArr[i]) - } - } - return s -} - func (t *tagValue) marshal() []byte { if t.valueArr != nil { var dst []byte @@ -66,7 +52,7 @@ func (t *tagValue) marshal() []byte { dst = append(dst, t.valueArr[i]...) continue } - dst = marshalVarArray(dst, t.valueArr[i]) + dst = encoding.MarshalVarArray(dst, t.valueArr[i]) } return dst } @@ -88,43 +74,22 @@ func releaseTagValue(v *tagValue) { var tagValuePool = pool.Register[*tagValue]("trace-tagValue") -const ( - entityDelimiter = '|' - escape = '\\' -) - -func marshalVarArray(dest, src []byte) []byte { - if bytes.IndexByte(src, entityDelimiter) < 0 && bytes.IndexByte(src, escape) < 0 { - dest = append(dest, src...) - dest = append(dest, entityDelimiter) - return dest - } - for _, b := range src { - if b == entityDelimiter || b == escape { - dest = append(dest, escape) - } - dest = append(dest, b) - } - dest = append(dest, entityDelimiter) - return dest -} - func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) { if len(src) == 0 { return nil, nil, errors.New("empty entity value") } - if src[0] == entityDelimiter { + if src[0] == encoding.EntityDelimiter { return dest, src[1:], nil } for len(src) > 0 { switch { - case src[0] == escape: + case src[0] == encoding.Escape: if len(src) < 2 { return nil, nil, errors.New("invalid escape character") } src = src[1:] dest = append(dest, src[0]) - case src[0] == entityDelimiter: + case src[0] == encoding.EntityDelimiter: return dest, src[1:], nil default: dest = append(dest, src[0]) @@ -134,19 +99,6 @@ func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) { return nil, nil, errors.New("invalid variable array") } -type tagValues struct { - tag string - values []*tagValue -} - -func (t *tagValues) reset() { - t.tag = "" - for i := range t.values { - releaseTagValue(t.values[i]) - } - t.values = t.values[:0] -} - type traces struct { traceIDs []string timestamps []int64 diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go index 9873b4d8..ed3c74dc 100644 --- a/banyand/trace/write_data.go +++ b/banyand/trace/write_data.go @@ -212,15 +212,11 @@ func (s *syncCallback) handleTraceFileChunk(ctx *queue.ChunkedSyncPartContext, c partCtx.writers.spanWriter.MustWrite(chunk) case fileName == traceIDFilterFilename: if partCtx.memPart != nil { - if err := s.handleTraceIDFilterChunk(partCtx, chunk); err != nil { - return fmt.Errorf("failed to handle traceID filter chunk: %w", err) - } + s.handleTraceIDFilterChunk(partCtx, chunk) } case fileName == tagTypeFilename: if partCtx.memPart != nil { - if err := s.handleTagTypeChunk(partCtx, chunk); err != nil { - return fmt.Errorf("failed to handle tag type chunk: %w", err) - } + s.handleTagTypeChunk(partCtx, chunk) } case strings.HasPrefix(fileName, traceTagsPrefix): tagName := fileName[len(traceTagsPrefix):] @@ -237,12 +233,10 @@ func (s *syncCallback) handleTraceFileChunk(ctx *queue.ChunkedSyncPartContext, c return nil } -func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext, chunk []byte) error { +func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext, chunk []byte) { partCtx.traceIDFilterBuffer = append(partCtx.traceIDFilterBuffer, chunk...) - return nil } -func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk []byte) error { +func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk []byte) { partCtx.tagTypeBuffer = append(partCtx.tagTypeBuffer, chunk...) - return nil }
