Copilot commented on code in PR #1174: URL: https://github.com/apache/skywalking-banyandb/pull/1174#discussion_r3407591230
########## banyand/metadata/schema/reader/reader.go: ########## @@ -0,0 +1,209 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package reader provides offline read access to the property-based schema +// catalog (the `_schema` bluge index written by the property schema server). +// It is used by tools that must load schemas without a running schema server, +// e.g. the data-migration CLI reading a backup snapshot or a live PVC mount. +package reader + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/blugelabs/bluge" + "google.golang.org/protobuf/encoding/protojson" + + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema/property" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +const schemaSearchSize = 200000 + +// Bluge field / directory names of property documents, mirroring the +// (unexported) layout written by banyand/property/db so the offline reader +// can re-open the index via raw bluge. +const ( + propShardDirPrefix = "shard-" + propSourceField = "_source" + propGroupField = "_group" + propEntityIDField = "_entity_id" + propDeleteField = "_deleted" +) + +// Doc is one decoded doc emitted by WalkShard; the caller decides whether +// the kind / group matches what it wants. +type Doc struct { + PropID string + KindName string // schema.Kind String() value: "group" / "stream" / "measure" / ... + Group string // the "group" tag; empty for docs that carry none + SourceJSON string // embedded protobuf JSON ready for kind-specific Unmarshal + ModRev int64 + Deleted bool +} + +// kindsQuery narrows a shard scan to the requested schema kinds by reusing +// the exact per-kind query the schema server issues against the property db +// (property.BuildSchemaQuery → inverted.BuildPropertyQuery), so a term match +// skips loading the stored _source of every other doc (nodes, other kinds) +// instead of match-all'ing the whole shard. No kinds means every doc. +func kindsQuery(kinds []schema.Kind) (bluge.Query, error) { + if len(kinds) == 0 { + return bluge.NewMatchAllQuery(), nil + } + q := bluge.NewBooleanQuery().SetMinShould(1) + for _, k := range kinds { + iq, err := inverted.BuildPropertyQuery(property.BuildSchemaQuery(k, "", "", 0), + propGroupField, propEntityIDField) + if err != nil { + return nil, fmt.Errorf("build %s schema query: %w", k.String(), err) + } + bq, ok := inverted.BlugeQuery(iq) + if !ok { + return nil, fmt.Errorf("unexpected %s schema query type %T", k.String(), iq) + } + q.AddShould(bq) + } + return q, nil +} + +// WalkShard opens one shard of the schema-property bluge index and invokes +// visit() for each doc. Passing kinds narrows the scan to those schema kinds +// via the indexed kind field; no kinds means every doc. +func WalkShard(shardPath string, visit func(Doc) error, kinds ...schema.Kind) error { + blugeReader, err := bluge.OpenReader(bluge.DefaultConfig(shardPath)) + if err != nil { + return fmt.Errorf("open bluge reader: %w", err) + } + defer func() { _ = blugeReader.Close() }() + query, err := kindsQuery(kinds) + if err != nil { + return err + } + dmi, err := blugeReader.Search(context.Background(), + bluge.NewTopNSearch(schemaSearchSize, query)) + if err != nil { + return fmt.Errorf("search schema docs: %w", err) + } + matched := 0 + for { + next, nextErr := dmi.Next() + if nextErr != nil { + return fmt.Errorf("iterate schema docs: %w", nextErr) + } + if next == nil { + if matched >= schemaSearchSize { + return fmt.Errorf("shard %s: schema docs hit the search limit %d; results may be truncated", shardPath, schemaSearchSize) + } Review Comment: `WalkShard` uses `TopNSearch(schemaSearchSize, ...)` and then treats `matched >= schemaSearchSize` as truncation. This will incorrectly fail when a shard legitimately contains exactly `schemaSearchSize` matching docs (no truncation) because TopN returns at most N results. Consider requesting `schemaSearchSize+1` and only erroring when more than `schemaSearchSize` docs are observed. ########## banyand/internal/migration/unionsidx.go: ########## @@ -0,0 +1,325 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package migration + +import ( + "context" + "fmt" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "sync/atomic" + + "github.com/blugelabs/bluge" + blugesearch "github.com/blugelabs/bluge/search" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +// The union sidx is rebuilt read-only via raw bluge — no banyandb store +// lifecycle is needed. +const ( + // Larger batches mean fewer bluge merger ticks; the merger loop + // dominates CPU at small batch sizes. 50k keeps the merger work + // off the hot path while still fitting in the per-process heap. + unionSidxBatchSize = 50000 + + segPrefix = "seg-" + sidxDirName = "sidx" + + // Stored bluge field names of sidx (inverted-index) documents, mirroring + // the (unexported) layout written by pkg/index/inverted so this builder + // can re-emit docs read via raw bluge. + sidxDocIDField = "_id" + sidxTimestampField = "_timestamp" + sidxVersionField = "_version" +) + +// BuildGroupUnionSidx walks every srcGroupRoot/seg-*/sidx/ directory +// across every supplied group root, scans every series-index doc, +// deduplicates by SeriesID, and re-emits the surviving docs into a +// fresh bluge index rooted at stagingPath. +// +// The returned path is stagingPath when at least one doc was written; +// it is "" (without error) when no source sidx contained any doc — the +// caller treats this as "no sidx to broadcast" and skips the per-target +// copy. SeriesID is recovered by unmarshaling the source doc's EntityValues. +// logf, when non-nil, receives progress lines (the scan runs minutes-long +// with no other output on large groups). +func BuildGroupUnionSidx(ctx context.Context, srcGroupRoots []string, stagingPath string, logf func(format string, args ...any)) (string, error) { + if logf == nil { + logf = func(string, ...any) {} + } + if err := os.MkdirAll(stagingPath, storage.DirPerm); err != nil { + return "", fmt.Errorf("mkdir staging %q: %w", stagingPath, err) + } + + writer, err := bluge.OpenWriter(bluge.DefaultConfig(stagingPath)) + if err != nil { + return "", fmt.Errorf("open union sidx writer at %q: %w", stagingPath, err) + } + closed := false + defer func() { + if !closed { + _ = writer.Close() + } + }() + + // Collect every source sidx directory across all node + segment + // roots up front so the worker pool can fan out cleanly. + var sidxPaths []string + for _, srcGroupRoot := range srcGroupRoots { + if ctx.Err() != nil { + return "", ctx.Err() + } + segEntries, err := os.ReadDir(srcGroupRoot) + if err != nil { + if os.IsNotExist(err) { + continue + } + return "", fmt.Errorf("read src group root %q: %w", srcGroupRoot, err) + } + for _, se := range segEntries { + if !se.IsDir() || !strings.HasPrefix(se.Name(), segPrefix) { + continue + } + srcSidxPath := filepath.Join(srcGroupRoot, se.Name(), sidxDirName) + info, statErr := os.Stat(srcSidxPath) + if statErr != nil || !info.IsDir() { + continue + } + sidxPaths = append(sidxPaths, srcSidxPath) + } + } + + seen := make(map[common.SeriesID]struct{}, 1_000_000) + var seenMu sync.Mutex + var writerMu sync.Mutex + var insertedAtomic, scannedAtomic, doneAtomic atomic.Int64 + var firstErr atomic.Pointer[error] + + // GOMAXPROCS(0) honors the container CPU quota (automaxprocs); NumCPU + // reports the node's physical cores and would over-fan readers, each + // holding decompressed stored-field blocks. + workerCount := runtime.GOMAXPROCS(0) + if workerCount > len(sidxPaths) { + workerCount = len(sidxPaths) + } + if workerCount < 1 { + workerCount = 1 + } + logf("union sidx: scanning %d source sidx dir(s) under %d root(s) with %d worker(s)", + len(sidxPaths), len(srcGroupRoots), workerCount) + + pathCh := make(chan string) + var wg sync.WaitGroup + workerCtx, cancelWorkers := context.WithCancel(ctx) + defer cancelWorkers() + + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for srcSidxPath := range pathCh { + if workerCtx.Err() != nil { + return + } + logf("union sidx: start scanning %s", srcSidxPath) + count, scanned, mergeErr := mergeOneSourceSidxInto(workerCtx, srcSidxPath, writer, seen, &seenMu, &writerMu) + if mergeErr != nil { + e := fmt.Errorf("merge %s: %w", srcSidxPath, mergeErr) + if firstErr.CompareAndSwap(nil, &e) { + cancelWorkers() + } + return + } + insertedAtomic.Add(int64(count)) + scannedAtomic.Add(int64(scanned)) + done := doneAtomic.Add(1) + logf("union sidx: scanned %d/%d sidx dir(s) (%.1f%%): %s", + done, len(sidxPaths), float64(done)*100/float64(len(sidxPaths)), srcSidxPath) + } + }() + } + for _, p := range sidxPaths { + select { + case pathCh <- p: + case <-workerCtx.Done(): + } + } + close(pathCh) + wg.Wait() + if errPtr := firstErr.Load(); errPtr != nil { + return "", *errPtr + } + inserted := int(insertedAtomic.Load()) + scanned := int(scannedAtomic.Load()) + logf("union sidx: scan finished — scanned=%d uniqueSeries=%d dedup-skipped=%d; closing writer (final bluge merge)", + scanned, inserted, scanned-inserted) + + closed = true + if closeErr := writer.Close(); closeErr != nil { + return "", fmt.Errorf("close union sidx writer: %w", closeErr) + } + if inserted == 0 { + _ = os.RemoveAll(stagingPath) + return "", nil + } + return stagingPath, nil +} + +func mergeOneSourceSidxInto( + ctx context.Context, + srcPath string, + dst *bluge.Writer, + seen map[common.SeriesID]struct{}, + seenMu *sync.Mutex, + writerMu *sync.Mutex, +) (inserted, scanned int, err error) { + reader, err := bluge.OpenReader(bluge.DefaultConfig(srcPath)) + if err != nil { + // A sidx directory may exist on disk while carrying no committed + // bluge snapshot — e.g. a fresh segment created by the runtime + // whose sidx writer never received a doc. Treat that as "no + // sidx to merge for this seg" instead of aborting the whole + // per-group union build. + if strings.Contains(err.Error(), "unable to find a usable snapshot") { + return 0, 0, nil + } + return 0, 0, fmt.Errorf("open reader: %w", err) + } + defer func() { _ = reader.Close() }() + + dmi, err := reader.Search(ctx, bluge.NewAllMatches(bluge.NewMatchAllQuery())) + if err != nil { + return 0, 0, fmt.Errorf("search: %w", err) + } + + batch := bluge.NewBatch() + batched := 0 + + flush := func() error { + writerMu.Lock() + defer writerMu.Unlock() + return dst.Batch(batch) + } + + for { + next, nextErr := dmi.Next() + if nextErr != nil { + return inserted, scanned, fmt.Errorf("iterate docs: %w", nextErr) + } + if next == nil { + break + } + scanned++ + + doc, dup, buildErr := buildDocFromMatchLocked(next, seen, seenMu) + if buildErr != nil { + return inserted, scanned, buildErr + } + if dup || doc == nil { + continue + } + batch.Insert(doc) + batched++ + inserted++ + if batched >= unionSidxBatchSize { + if err := flush(); err != nil { + return inserted, scanned, fmt.Errorf("flush batch: %w", err) + } + batch = bluge.NewBatch() + batched = 0 + } + } + if batched > 0 { + if err := flush(); err != nil { + return inserted, scanned, fmt.Errorf("flush tail batch: %w", err) + } + } + return inserted, scanned, nil +} + +// buildDocFromMatchLocked rebuilds one series-index doc from its stored +// fields, deduplicating by SeriesID under seenMu. +func buildDocFromMatchLocked( + match *blugesearch.DocumentMatch, + seen map[common.SeriesID]struct{}, + seenMu *sync.Mutex, +) (*bluge.Document, bool, error) { + var entityValues []byte + type storedField struct { + name string + value []byte + } + var fields []storedField + visitErr := match.VisitStoredFields(func(field string, value []byte) bool { + switch field { + case sidxDocIDField: + entityValues = append([]byte(nil), value...) + default: + fields = append(fields, storedField{ + name: field, + value: append([]byte(nil), value...), + }) + } + return true + }) + if visitErr != nil { + return nil, false, fmt.Errorf("visit stored fields: %w", visitErr) + } + if len(entityValues) == 0 { + return nil, false, nil + } + var series pbv1.Series + if err := series.Unmarshal(entityValues); err != nil { + return nil, false, nil + } + seenMu.Lock() + if _, dup := seen[series.ID]; dup { + seenMu.Unlock() + return nil, true, nil + } + seen[series.ID] = struct{}{} + seenMu.Unlock() + + doc := bluge.NewDocument(string(entityValues)) + for _, f := range fields { + switch f.name { + case sidxTimestampField: + ts, decErr := bluge.DecodeDateTime(f.value) + if decErr != nil { + return nil, false, fmt.Errorf("decode timestamp on series %d: %w", series.ID, decErr) + } + doc.AddField(bluge.NewDateTimeField(f.name, ts).StoreValue()) + case sidxVersionField: + doc.AddField(bluge.NewStoredOnlyField(f.name, f.value)) + default: + // Stream series docs carry only entity values, so this branch is + // unreachable today. Bluge stored fields do not retain index / + // sortable / analyzer properties; a future caller unioning docs + // with secondary fields would lose those properties here. + doc.AddField(bluge.NewKeywordFieldBytes(f.name, f.value).StoreValue()) Review Comment: `buildDocFromMatchLocked` reconstructs every non-internal stored field as `NewKeywordFieldBytes(...).StoreValue()`, which drops the original field’s analyzer and sortable settings. That can change query semantics for any series-index field whose index rule uses a non-keyword analyzer and can break sorting/range behavior that depends on doc-values. The inline comment also says this branch is “unreachable”, but it is exercised for measure/stream tag fields (any stored tag fields show up here). Consider passing (ruleID/tagName -> analyzer/noSort) metadata into the union builder (from the schema reader) so fields can be re-emitted with the correct analyzer + sortable flags, or explicitly validating that all involved rules are keyword+noSort before proceeding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
