Copilot commented on code in PR #1189:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1189#discussion_r3458785120


##########
pkg/pipeline/sdk/decode.go:
##########
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sdk
+
+import (
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// Value is a single decoded tag value. The accessor matching ValueType returns
+// the decoded datum; the others return their zero value. A nil raw value
+// decodes to a null Value (IsNull reports true).
+type Value struct {
+       str       string
+       bytes     []byte
+       strArr    []string
+       intArr    []int64
+       int64Val  int64
+       floatVal  float64
+       valueType pbv1.ValueType
+       null      bool
+}
+
+// IsNull reports whether the tag was absent on this row.
+func (v Value) IsNull() bool { return v.null }
+
+// ValueType returns the type tag of the decoded value.
+func (v Value) ValueType() pbv1.ValueType { return v.valueType }
+
+// Str returns the string value (valid for ValueTypeStr).
+func (v Value) Str() string { return v.str }
+
+// Int64 returns the integer value (valid for ValueTypeInt64 and, as unix
+// nanoseconds, ValueTypeTimestamp).
+func (v Value) Int64() int64 { return v.int64Val }
+
+// Float64 returns the float value (valid for ValueTypeFloat64).
+func (v Value) Float64() float64 { return v.floatVal }
+
+// Bytes returns the raw value (valid for ValueTypeBinaryData).
+func (v Value) Bytes() []byte { return v.bytes }
+
+// StrArr returns the string-array value (valid for ValueTypeStrArr).
+func (v Value) StrArr() []string { return v.strArr }
+
+// Int64Arr returns the integer-array value (valid for ValueTypeInt64Arr).
+func (v Value) Int64Arr() []int64 { return v.intArr }
+
+// At decodes the value at the given span row. A nil element decodes to a null
+// Value. It returns an error if row is out of range.
+func (c *TagColumn) At(row int) (Value, error) {
+       if row < 0 || row >= len(c.Values) {
+               return Value{}, fmt.Errorf("tag %q: row %d out of range 
[0,%d)", c.Name, row, len(c.Values))
+       }
+       return DecodeTagValue(c.ValueType, c.Values[row])
+}
+
+// DecodeTagValue decodes one marshaled tag value, as stored in the native 
trace
+// block, into a typed Value. It mirrors the engine's own per-row decode so a
+// plugin never needs to import banyand/trace internals. A nil raw value yields
+// a null Value.
+func DecodeTagValue(valueType pbv1.ValueType, raw []byte) (Value, error) {
+       if raw == nil {
+               return Value{valueType: valueType, null: true}, nil
+       }
+       switch valueType {
+       case pbv1.ValueTypeStr:
+               return Value{valueType: valueType, str: string(raw)}, nil
+       case pbv1.ValueTypeInt64:
+               return Value{valueType: valueType, int64Val: 
convert.BytesToInt64(raw)}, nil
+       case pbv1.ValueTypeFloat64:
+               return Value{valueType: valueType, floatVal: 
convert.BytesToFloat64(raw)}, nil
+       case pbv1.ValueTypeBinaryData:
+               return Value{valueType: valueType, bytes: raw}, nil
+       case pbv1.ValueTypeTimestamp:
+               return Value{valueType: valueType, int64Val: 
convert.BytesToInt64(raw)}, nil

Review Comment:
   `DecodeTagValue` validates the byte length for `ValueTypeInt64Arr`, but it 
does not validate fixed-width encodings (`Int64`, `Float64`, `Timestamp`). If 
`raw` is not exactly 8 bytes, the `convert.BytesToInt64/Float64` calls can 
decode garbage or panic depending on their implementation. Add explicit 
`len(raw) == 8` checks for these fixed-width cases and return a descriptive 
error when the length is invalid.



##########
pkg/pipeline/sdk/_example/segment-tail-sampler/main.go:
##########
@@ -0,0 +1,315 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Command segment-tail-sampler is the reference post-trace sampler plugin from
+// docs/design/post-trace-pipeline.md §6.1. It is a worked example of the
+// pkg/pipeline/sdk contract and illustrates the three things a real plugin
+// must do:
+//
+//  1. Parse the operator-supplied config. SamplerPlugin.config is a
+//     google.protobuf.Struct; the engine serializes it to canonical JSON and
+//     hands it to NewSampler as bytes, which this plugin unmarshals into its
+//     own typed config.
+//  2. Declare a projection (Project) so the engine materializes only the
+//     columns the verdict reads — here a handful of tag columns, plus the
+//     span-id column only when a span-count rule is configured.
+//  3. Extract tags and spans from the vectorized batch in Decide — decoding 
tag
+//     values by their ValueType and reading the span-id column.
+//
+// Build it as a Go plugin (it is deliberately under an _example directory so
+// `go build ./...` and the linters skip it):
+//
+//     go build -buildmode=plugin -trimpath \
+//       -o segment-tail-sampler.so \
+//       ./pkg/pipeline/sdk/_example/segment-tail-sampler
+//
+// It must be built with the same Go toolchain and the same pinned
+// pkg/pipeline/sdk as the running data node (see §2.5).
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "hash/fnv"
+       "math"
+       "regexp"
+       "strconv"
+       "time"
+
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pipeline/sdk"
+)
+
+// ABIVersion re-exports the SDK ABI version. The engine refuses to load the
+// plugin unless this equals its own compiled sdk.ABIVersion.
+var ABIVersion = sdk.ABIVersion
+
+// tagRule is one sure-keep tag predicate. Exactly one matcher field is 
honored,
+// checked in the order exists, equals, in, regex.
+type tagRule struct {
+       re     *regexp.Regexp
+       In     []string `json:"in"`
+       Regex  string   `json:"regex"`
+       TagKey string   `json:"tag_key"`
+       Equals string   `json:"equals"`
+       Exists bool     `json:"exists"`
+}
+
+// config is the JSON shape the operator sets in SamplerPlugin.config.
+type config struct {
+       DurationThreshold string    `json:"duration_threshold"`
+       ErrorTag          string    `json:"error_tag"`
+       KeepTagRules      []tagRule `json:"keep_tag_rules"`
+       HealthySampleRate float64   `json:"healthy_sample_rate"`
+       MinSpanCount      int       `json:"min_span_count"`
+       KeepErrors        bool      `json:"keep_errors"`
+}
+
+// segmentTailSampler keeps a trace when any sure-keep rule matches, and
+// otherwise admits a deterministic fraction of the healthy remainder.
+type segmentTailSampler struct {
+       errorTag          string
+       rules             []tagRule
+       requiredTags      []string
+       durationThreshold time.Duration
+       healthySampleRate float64
+       minSpanCount      int
+       keepErrors        bool
+       wantSpanIDs       bool
+}
+
+// NewSampler is the constructor symbol the engine looks up. It parses and
+// validates the operator config, compiles any regex matchers, and computes the
+// projection. A returned error rejects the plugin at admission.
+func NewSampler(configJSON []byte) (sdk.Sampler, error) {
+       var c config
+       if len(configJSON) > 0 {
+               if err := json.Unmarshal(configJSON, &c); err != nil {
+                       return nil, fmt.Errorf("segment-tail-sampler: invalid 
config JSON: %w", err)
+               }
+       }
+       if c.HealthySampleRate < 0 || c.HealthySampleRate > 1 {
+               return nil, fmt.Errorf("segment-tail-sampler: 
healthy_sample_rate %v out of [0,1]", c.HealthySampleRate)
+       }
+       s := &segmentTailSampler{
+               rules:             c.KeepTagRules,
+               healthySampleRate: c.HealthySampleRate,
+               keepErrors:        c.KeepErrors,
+               errorTag:          c.ErrorTag,
+               minSpanCount:      c.MinSpanCount,
+       }
+       if s.errorTag == "" {
+               s.errorTag = "is_error"
+       }
+       if c.DurationThreshold != "" {
+               d, err := time.ParseDuration(c.DurationThreshold)
+               if err != nil {
+                       return nil, fmt.Errorf("segment-tail-sampler: invalid 
duration_threshold %q: %w", c.DurationThreshold, err)
+               }
+               if d <= 0 {
+                       return nil, fmt.Errorf("segment-tail-sampler: 
duration_threshold must be positive, got %v", d)
+               }
+               s.durationThreshold = d
+       }
+
+       // Build the projection: the error tag (when keep_errors is set) and 
every
+       // rule's tag key. Compile regex matchers once, here, not per batch.
+       tagSet := make(map[string]struct{})
+       if s.keepErrors {
+               tagSet[s.errorTag] = struct{}{}
+       }
+       for i := range s.rules {
+               if s.rules[i].TagKey == "" {
+                       return nil, fmt.Errorf("segment-tail-sampler: 
keep_tag_rules[%d] has empty tag_key", i)
+               }
+               if s.rules[i].Regex != "" {
+                       re, err := regexp.Compile(s.rules[i].Regex)
+                       if err != nil {
+                               return nil, fmt.Errorf("segment-tail-sampler: 
keep_tag_rules[%d] bad regex %q: %w", i, s.rules[i].Regex, err)
+                       }
+                       s.rules[i].re = re
+               }
+               tagSet[s.rules[i].TagKey] = struct{}{}
+       }
+       s.requiredTags = make([]string, 0, len(tagSet))
+       for k := range tagSet {
+               s.requiredTags = append(s.requiredTags, k)
+       }

Review Comment:
   `requiredTags` is built by iterating a map, which makes the projection tag 
list nondeterministically ordered across runs. Even if the engine treats the 
tag projection as a set, keeping this slice stable improves reproducibility 
(debugging, tests, logs, and any potential downstream caching keyed by the 
slice). Sort `s.requiredTags` before returning it from `Project()`.



##########
pkg/pipeline/sdk/_example/segment-tail-sampler/main.go:
##########
@@ -0,0 +1,315 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Command segment-tail-sampler is the reference post-trace sampler plugin from
+// docs/design/post-trace-pipeline.md §6.1. It is a worked example of the
+// pkg/pipeline/sdk contract and illustrates the three things a real plugin
+// must do:
+//
+//  1. Parse the operator-supplied config. SamplerPlugin.config is a
+//     google.protobuf.Struct; the engine serializes it to canonical JSON and
+//     hands it to NewSampler as bytes, which this plugin unmarshals into its
+//     own typed config.
+//  2. Declare a projection (Project) so the engine materializes only the
+//     columns the verdict reads — here a handful of tag columns, plus the
+//     span-id column only when a span-count rule is configured.
+//  3. Extract tags and spans from the vectorized batch in Decide — decoding 
tag
+//     values by their ValueType and reading the span-id column.
+//
+// Build it as a Go plugin (it is deliberately under an _example directory so
+// `go build ./...` and the linters skip it):
+//
+//     go build -buildmode=plugin -trimpath \
+//       -o segment-tail-sampler.so \
+//       ./pkg/pipeline/sdk/_example/segment-tail-sampler
+//
+// It must be built with the same Go toolchain and the same pinned
+// pkg/pipeline/sdk as the running data node (see §2.5).
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "hash/fnv"
+       "math"
+       "regexp"
+       "strconv"
+       "time"
+
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pipeline/sdk"
+)
+
+// ABIVersion re-exports the SDK ABI version. The engine refuses to load the
+// plugin unless this equals its own compiled sdk.ABIVersion.
+var ABIVersion = sdk.ABIVersion
+
+// tagRule is one sure-keep tag predicate. Exactly one matcher field is 
honored,
+// checked in the order exists, equals, in, regex.
+type tagRule struct {
+       re     *regexp.Regexp
+       In     []string `json:"in"`
+       Regex  string   `json:"regex"`
+       TagKey string   `json:"tag_key"`
+       Equals string   `json:"equals"`
+       Exists bool     `json:"exists"`
+}
+
+// config is the JSON shape the operator sets in SamplerPlugin.config.
+type config struct {
+       DurationThreshold string    `json:"duration_threshold"`
+       ErrorTag          string    `json:"error_tag"`
+       KeepTagRules      []tagRule `json:"keep_tag_rules"`
+       HealthySampleRate float64   `json:"healthy_sample_rate"`
+       MinSpanCount      int       `json:"min_span_count"`
+       KeepErrors        bool      `json:"keep_errors"`
+}
+
+// segmentTailSampler keeps a trace when any sure-keep rule matches, and
+// otherwise admits a deterministic fraction of the healthy remainder.
+type segmentTailSampler struct {
+       errorTag          string
+       rules             []tagRule
+       requiredTags      []string
+       durationThreshold time.Duration
+       healthySampleRate float64
+       minSpanCount      int
+       keepErrors        bool
+       wantSpanIDs       bool
+}
+
+// NewSampler is the constructor symbol the engine looks up. It parses and
+// validates the operator config, compiles any regex matchers, and computes the
+// projection. A returned error rejects the plugin at admission.
+func NewSampler(configJSON []byte) (sdk.Sampler, error) {
+       var c config
+       if len(configJSON) > 0 {
+               if err := json.Unmarshal(configJSON, &c); err != nil {
+                       return nil, fmt.Errorf("segment-tail-sampler: invalid 
config JSON: %w", err)
+               }
+       }
+       if c.HealthySampleRate < 0 || c.HealthySampleRate > 1 {
+               return nil, fmt.Errorf("segment-tail-sampler: 
healthy_sample_rate %v out of [0,1]", c.HealthySampleRate)
+       }
+       s := &segmentTailSampler{
+               rules:             c.KeepTagRules,
+               healthySampleRate: c.HealthySampleRate,
+               keepErrors:        c.KeepErrors,
+               errorTag:          c.ErrorTag,
+               minSpanCount:      c.MinSpanCount,
+       }
+       if s.errorTag == "" {
+               s.errorTag = "is_error"
+       }
+       if c.DurationThreshold != "" {
+               d, err := time.ParseDuration(c.DurationThreshold)
+               if err != nil {
+                       return nil, fmt.Errorf("segment-tail-sampler: invalid 
duration_threshold %q: %w", c.DurationThreshold, err)
+               }
+               if d <= 0 {
+                       return nil, fmt.Errorf("segment-tail-sampler: 
duration_threshold must be positive, got %v", d)
+               }
+               s.durationThreshold = d
+       }
+
+       // Build the projection: the error tag (when keep_errors is set) and 
every
+       // rule's tag key. Compile regex matchers once, here, not per batch.
+       tagSet := make(map[string]struct{})
+       if s.keepErrors {
+               tagSet[s.errorTag] = struct{}{}
+       }
+       for i := range s.rules {
+               if s.rules[i].TagKey == "" {
+                       return nil, fmt.Errorf("segment-tail-sampler: 
keep_tag_rules[%d] has empty tag_key", i)
+               }
+               if s.rules[i].Regex != "" {
+                       re, err := regexp.Compile(s.rules[i].Regex)
+                       if err != nil {
+                               return nil, fmt.Errorf("segment-tail-sampler: 
keep_tag_rules[%d] bad regex %q: %w", i, s.rules[i].Regex, err)
+                       }
+                       s.rules[i].re = re
+               }
+               tagSet[s.rules[i].TagKey] = struct{}{}
+       }
+       s.requiredTags = make([]string, 0, len(tagSet))
+       for k := range tagSet {
+               s.requiredTags = append(s.requiredTags, k)
+       }
+       // A span-count rule reads the span-id column, so project it on demand.
+       s.wantSpanIDs = s.minSpanCount > 0
+       return s, nil
+}
+
+// Kind reports the sampler kind, satisfying the generic sdk.Plugin interface
+// that sdk.Sampler embeds.
+func (s *segmentTailSampler) Kind() sdk.Kind { return sdk.KindSampler }
+
+// Project declares the columns the verdict reads: the rule/error tag columns,
+// plus the span-id column only when a span-count rule is configured. Span
+// bodies are never read, so Spans stays false and the merge raw fast path is
+// preserved.
+func (s *segmentTailSampler) Project() sdk.Projection {
+       return sdk.Projection{Tags: s.requiredTags, SpanIDs: s.wantSpanIDs}
+}
+
+// Decide returns a keep-mask aligned to batch.Traces. The batch is read-only.
+func (s *segmentTailSampler) Decide(batch *sdk.TraceBatch) (sdk.Verdict, 
error) {
+       keep := make([]bool, len(batch.Traces))
+       for i := range batch.Traces {
+               k, err := s.keepTrace(&batch.Traces[i])
+               if err != nil {
+                       return sdk.Verdict{}, err
+               }
+               keep[i] = k
+       }
+       return sdk.Verdict{Keep: keep}, nil
+}
+
+// Close releases resources; this sampler holds none.
+func (s *segmentTailSampler) Close() error { return nil }
+
+// keepTrace applies the sure-keep rules, then the deterministic healthy 
sample.
+func (s *segmentTailSampler) keepTrace(b *sdk.TraceBlock) (bool, error) {
+       // Duration is free from the intrinsic MinTS/MaxTS — no decode.
+       if s.durationThreshold > 0 && time.Duration(b.MaxTS-b.MinTS) >= 
s.durationThreshold {
+               return true, nil
+       }
+       // Error keep: decode the error tag column and look for any truthy row.
+       if s.keepErrors {
+               hit, err := s.hasError(b)
+               if err != nil {
+                       return false, err
+               }
+               if hit {
+                       return true, nil
+               }
+       }
+       // Sure-keep tag rules.
+       for i := range s.rules {
+               hit, err := matchRule(b, &s.rules[i])
+               if err != nil {
+                       return false, err
+               }
+               if hit {
+                       return true, nil
+               }
+       }
+       // Span-count rule: read the span-id column the projection requested.
+       if s.minSpanCount > 0 && b.Len() >= s.minSpanCount {
+               return true, nil
+       }
+       // Healthy remainder: deterministic hash(trace_id) < rate, stable across
+       // re-evaluation at merge and finalization.
+       if s.healthySampleRate > 0 && sampleFraction(b.TraceID) < 
s.healthySampleRate {
+               return true, nil
+       }
+       return false, nil
+}
+
+// hasError reports whether the error tag is truthy on any span row.
+func (s *segmentTailSampler) hasError(b *sdk.TraceBlock) (bool, error) {
+       col := b.Tag(s.errorTag)
+       if col == nil {
+               return false, nil
+       }
+       for row := range col.Values {
+               v, err := col.At(row)
+               if err != nil {
+                       return false, err
+               }
+               if v.IsNull() {
+                       continue
+               }
+               switch v.ValueType() {
+               case pbv1.ValueTypeInt64:
+                       if v.Int64() != 0 {
+                               return true, nil
+                       }
+               case pbv1.ValueTypeStr:
+                       if str := v.Str(); str == "true" || str == "1" {
+                               return true, nil
+                       }
+               default:
+               }
+       }
+       return false, nil
+}
+
+// matchRule reports whether the rule matches any span row of the trace.
+func matchRule(b *sdk.TraceBlock, r *tagRule) (bool, error) {
+       col := b.Tag(r.TagKey)
+       if col == nil {
+               return false, nil
+       }
+       for row := range col.Values {
+               v, err := col.At(row)
+               if err != nil {
+                       return false, err
+               }
+               if v.IsNull() {
+                       continue
+               }
+               if r.Exists {
+                       return true, nil
+               }
+               str := stringOf(v)
+               switch {
+               case r.Equals != "":
+                       if str == r.Equals {
+                               return true, nil
+                       }
+               case len(r.In) > 0:
+                       for _, candidate := range r.In {
+                               if str == candidate {
+                                       return true, nil
+                               }
+                       }
+               case r.re != nil:
+                       if r.re.MatchString(str) {
+                               return true, nil
+                       }
+               }
+       }
+       return false, nil
+}
+
+// stringOf renders a decoded value as a string for matching against the rule's
+// string predicates.
+func stringOf(v sdk.Value) string {
+       switch v.ValueType() {
+       case pbv1.ValueTypeStr:
+               return v.Str()
+       case pbv1.ValueTypeInt64, pbv1.ValueTypeTimestamp:
+               return strconv.FormatInt(v.Int64(), 10)
+       case pbv1.ValueTypeFloat64:
+               return strconv.FormatFloat(v.Float64(), 'g', -1, 64)
+       case pbv1.ValueTypeBinaryData:
+               return string(v.Bytes())
+       default:
+               return ""
+       }
+}
+
+// sampleFraction maps a trace_id to a stable fraction in [0,1) via FNV-1a, so
+// the keep decision is deterministic and reproducible across passes.
+func sampleFraction(traceID string) float64 {
+       h := fnv.New64a()
+       _, _ = h.Write([]byte(traceID))
+       return float64(h.Sum64()) / float64(math.MaxUint64)
+}

Review Comment:
   The comment says the result is in `[0,1)`, but dividing by `math.MaxUint64` 
yields `1.0` when `Sum64() == MaxUint64`, making the range `[0,1]`. This can 
cause an edge-case where `sampleFraction` returns exactly `1` and is never `< 
healthy_sample_rate` even when `healthy_sample_rate == 1`. Use a denominator 
that ensures the result is strictly less than 1 (e.g., divide by 
`float64(math.MaxUint64)+1`, or explicitly clamp `1` down).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to