hanahmily commented on code in PR #1200:
URL:
https://github.com/apache/skywalking-banyandb/pull/1200#discussion_r3506744186
##########
fodc/agent/internal/cmd/root.go:
##########
@@ -139,6 +154,20 @@ func init() {
"Directory where lifecycle sidecar writes report files")
rootCmd.Flags().DurationVar(&lifecycleCacheTTL, "lifecycle-cache-ttl",
10*time.Minute,
"TTL for cached lifecycle data. After expiry, the next
collection call refreshes the cache")
+ rootCmd.Flags().BoolVar(&pressureProfilerEnabled,
"pressure-profiler-enabled", true,
+ "Enable automatic heap+goroutine pprof capture when the
monitored container's RSS approaches its cgroup memory limit")
+ rootCmd.Flags().IntVar(&pressureTriggerPercent,
"pressure-profiler-trigger-percent", defaultPressureTriggerPercent,
Review Comment:
`--pressure-profiler-trigger-percent` is unvalidated. In `OnPollComplete`
the threshold is `limit * TriggerPercent / 100`, so:
- `0` (or a negative, which `IntVar` allows) makes `threshold <= 0`, so `rss
< threshold` is never true and capture fires on every poll (throttled only by
the cooldown + capturing CAS) regardless of real pressure;
- `> 100` makes `threshold > limit`, silently disabling capture until RSS
already exceeds the cgroup limit (i.e. you're effectively already OOMing).
Both are silent misconfigurations with no startup error. Please add a bound
in `validateFlags()` (e.g. reject `< 1 || > 100`) with a clear message,
matching how the other bounded flags are validated.
##########
fodc/proxy/internal/pressure/aggregator.go:
##########
@@ -0,0 +1,280 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package pressure aggregates memory-pressure pprof profile metadata from all
agents.
+package pressure
+
+import (
+ "context"
+ "strings"
+ "sync"
+ "time"
+
+ fodcv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+ "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// listTimeout bounds how long CollectProfiles waits for agents to finish
streaming their
+// metadata. It is a fallback only: the collection normally returns as soon as
every contacted
+// agent signals completion, well before this elapses.
+const listTimeout = 10 * time.Second
+
+// ProfileInfo is one pprof profile's metadata in the HTTP list response.
+type ProfileInfo struct {
+ Type string `json:"type"`
+ Filename string `json:"filename"`
+ Filepath string `json:"filepath"`
+ Format string `json:"format"`
+ SizeBytes int64 `json:"size_bytes"`
+}
+
+// AggregatedPressureProfile enriches a capture event with the agent's
identity.
+type AggregatedPressureProfile struct {
+ CapturedAt time.Time `json:"captured_at"`
+ AgentID string `json:"agent_id"`
+ PodName string `json:"pod_name"`
+ Role string `json:"role"`
+ ProfileID string `json:"profile_id"`
+ SourceEndpoint string `json:"source_endpoint"`
+ Profiles []ProfileInfo `json:"profiles"`
+ RSSBytes uint64 `json:"rss_bytes"`
+ CgroupLimitBytes uint64 `json:"cgroup_limit_bytes"`
+ ThresholdBytes uint64 `json:"threshold_bytes"`
+ TriggerPercent uint32 `json:"trigger_percent"`
+}
+
+// Filter narrows the listed/collected profiles by role and pod name.
+type Filter struct {
+ Role string
+ PodName string
+}
+
+// RequestSender drives the agents to stream their profile metadata.
CollectList sends a
+// list command to every given agent under one request and returns a channel
closed once all
+// of them have signaled completion.
+type RequestSender interface {
+ CollectList(agentIDs []string) <-chan struct{}
+}
+
+// Aggregator keeps a dedup cache of capture-event metadata nested by agentID
-> profileID.
+// On each list request it pulls fresh metadata from every matching agent,
waits a short
+// window, then returns the cache snapshot. The authoritative copy lives on
the agent disk.
+type Aggregator struct {
+ registry *registry.AgentRegistry
+ grpcService RequestSender
+ // cache is the live, queryable set: agentID -> profileID -> record.
+ cache map[string]map[string]*AggregatedPressureProfile
+ // staging holds the records of an in-progress list round, same shape
as cache. A round's
+ // records land here as they stream in and the whole per-agent map is
promoted to cache in
+ // one swap on ListComplete (FinalizeAgentList), so events the agent
has evicted from its
+ // disk drop out of the cache instead of lingering as unservable
entries.
+ staging map[string]map[string]*AggregatedPressureProfile
+ log *logger.Logger
+ cacheMu sync.RWMutex
+ mu sync.RWMutex
+}
+
+// NewAggregator creates a new Aggregator instance.
+func NewAggregator(reg *registry.AgentRegistry, grpcService RequestSender, log
*logger.Logger) *Aggregator {
+ return &Aggregator{
+ registry: reg,
+ grpcService: grpcService,
+ log: log,
+ cache:
make(map[string]map[string]*AggregatedPressureProfile),
+ staging:
make(map[string]map[string]*AggregatedPressureProfile),
+ }
+}
+
+// SetGRPCService sets the gRPC service used to request profiles from agents.
+func (a *Aggregator) SetGRPCService(grpcService RequestSender) {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+ a.grpcService = grpcService
+}
+
+// RemoveAgent drops all cached events for an agent (called on disconnect).
+func (a *Aggregator) RemoveAgent(agentID string) {
+ a.cacheMu.Lock()
+ defer a.cacheMu.Unlock()
+ delete(a.cache, agentID)
+ delete(a.staging, agentID)
+}
+
+// ProcessProfileFromAgent caches the metadata of one capture event, enriched
with the
+// agent's pod name and role.
+func (a *Aggregator) ProcessProfileFromAgent(agentID string, agentInfo
*registry.AgentInfo, record *fodcv1.PressureProfileRecord) {
+ if record == nil {
+ return
+ }
+ profiles := make([]ProfileInfo, 0, len(record.Profiles))
+ for _, p := range record.Profiles {
+ profiles = append(profiles, ProfileInfo{
+ Type: p.Type,
+ Filename: p.Filename,
+ Filepath: p.Filepath,
+ Format: p.Format,
+ SizeBytes: p.SizeBytes,
+ })
+ }
+ agg := &AggregatedPressureProfile{
+ AgentID: agentID,
+ PodName: agentInfo.AgentIdentity.PodName,
+ Role: agentInfo.AgentIdentity.Role,
+ ProfileID: record.ProfileId,
+ SourceEndpoint: record.SourceEndpoint,
+ RSSBytes: record.RssBytes,
+ CgroupLimitBytes: record.CgroupLimitBytes,
+ TriggerPercent: record.TriggerPercent,
+ ThresholdBytes: record.ThresholdBytes,
+ Profiles: profiles,
+ }
+ if record.CapturedAt != nil {
+ agg.CapturedAt = record.CapturedAt.AsTime()
+ }
+
+ a.cacheMu.Lock()
+ agentStaging := a.staging[agentID]
+ if agentStaging == nil {
+ agentStaging = make(map[string]*AggregatedPressureProfile)
+ a.staging[agentID] = agentStaging
Review Comment:
This can repopulate `staging[agentID]` after `RemoveAgent` has already
cleared it. If a `Record` message for agent X is being processed here while X's
connection tears down (`cleanupConnection -> RemoveAgent(X)` on another
goroutine), the ordering `RemoveAgent` (deletes `staging[X]`) then this write
(recreates `staging[X]`) leaves an orphaned staging entry. Because X
disconnected, no `ListComplete` arrives, so `FinalizeAgentList(X)` never runs
to promote/clear it; and since agent IDs are fresh per reconnect, nothing later
cleans it. The entry is never served (it's only in `staging`, not `cache`), so
it's functionally harmless, but it leaks one map per
disconnect-with-in-flight-record over the proxy's lifetime.
Suggest guarding the write so it doesn't create a staging entry for an agent
that is no longer registered/connected (the caller in `handlePressurePayload`
already resolves `agentInfo` via the registry), or having `RemoveAgent` leave a
short-lived tombstone that `ProcessProfileFromAgent` checks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]