Copilot commented on code in PR #1200:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1200#discussion_r3502569061


##########
fodc/agent/internal/watchdog/watchdog.go:
##########
@@ -320,6 +330,13 @@ func (w *Watchdog) pollAndForward(ctx context.Context) 
(context.Context, error)
                "metric_count": fmt.Sprintf("%d", len(rawMetrics)),
        })
 
+       w.mu.RLock()
+       hooks := w.postPollHooks
+       w.mu.RUnlock()
+       for _, hook := range hooks {
+               hook(ctx)
+       }

Review Comment:
   `postPollHooks` is read under the mutex but the slice is then iterated 
without copying. If `AddPostPollHook` is ever called concurrently with polling 
(even accidentally), `append` can mutate the underlying slice array while this 
loop is reading it, causing a data race. Copy the slice header+backing array 
under the lock before iterating.



##########
fodc/proxy/internal/grpc/service.go:
##########
@@ -732,3 +912,214 @@ func (s *FODCService) RequestDiagnostics(agentID string) 
error {
        }
        return agentConn.sendDiagnosticsRequest()
 }
+
+// fetchChunkTimeout bounds the wait for each download chunk; it resets per 
chunk, so a
+// large but steadily-streaming profile never trips it.
+const fetchChunkTimeout = 30 * time.Second
+
+// StreamPressureProfiles handles the bi-directional memory-pressure pprof 
stream: agents
+// reply to list/fetch commands with metadata records or binary chunks.
+func (s *FODCService) StreamPressureProfiles(stream 
fodcv1.FODCService_StreamPressureProfilesServer) error {
+       ctx := stream.Context()
+       agentID := s.getAgentIDFromContext(ctx)
+       if agentID == "" {
+               agentID = s.getAgentIDFromPeer(ctx)
+               if agentID != "" {
+                       s.logger.Warn().Str("agent_id", agentID).
+                               Msg("Agent ID not found in metadata, using peer 
address fallback (this may be unreliable)")
+               }
+       }
+       if agentID == "" {
+               s.logger.Error().Msg("Agent ID not found in context metadata or 
peer address for pressure profiles stream")
+               return status.Errorf(codes.Unauthenticated, "agent ID not found 
in context or peer address")
+       }
+
+       s.connectionsMu.Lock()
+       existingConn, exists := s.connections[agentID]
+       if exists {
+               existingConn.setPressureProfilesStream(stream)
+               existingConn.updateActivity()
+       } else {
+               s.connections[agentID] = &agentConnection{
+                       agentID:                agentID,
+                       pressureProfilesStream: stream,
+                       lastActivity:           time.Now(),
+               }
+       }
+       s.connectionsMu.Unlock()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", agentID).Msg("Pressure 
profiles stream closed by agent")
+                       return nil
+               }
+               if recvErr != nil {
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               s.logger.Debug().Err(recvErr).Str("agent_id", 
agentID).Msg("Pressure profiles stream closed")
+                       } else if st, ok := status.FromError(recvErr); ok && 
(st.Code() == codes.Canceled || st.Code() == codes.DeadlineExceeded) {
+                               s.logger.Debug().Err(recvErr).Str("agent_id", 
agentID).Msg("Pressure profiles stream closed")
+                       } else {
+                               s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving pressure profiles")
+                       }
+                       return recvErr
+               }
+               s.handlePressurePayload(agentID, req)
+       }
+}
+
+// handlePressurePayload routes one stream message: a metadata record to the 
aggregator,
+// or a download chunk to the waiting fetch handler.
+func (s *FODCService) handlePressurePayload(agentID string, req 
*fodcv1.StreamPressureProfilesRequest) {
+       switch payload := req.Payload.(type) {
+       case *fodcv1.StreamPressureProfilesRequest_Record:
+               if s.pressureAggregator == nil {
+                       return
+               }
+               agentInfo, getErr := s.registry.GetAgentByID(agentID)
+               if getErr != nil {
+                       s.logger.Error().Err(getErr).Str("agent_id", 
agentID).Msg("Failed to get agent info for pressure profile record")
+                       return
+               }
+               s.pressureAggregator.ProcessProfileFromAgent(agentID, 
agentInfo, payload.Record)
+       case *fodcv1.StreamPressureProfilesRequest_Chunk:
+               s.connectionsMu.RLock()
+               conn := s.connections[agentID]
+               s.connectionsMu.RUnlock()
+               if conn != nil {
+                       conn.deliverChunk(payload.Chunk)
+               }
+       case *fodcv1.StreamPressureProfilesRequest_ListComplete:
+               // Promote the agent's staged list into the cache before waking 
the waiting
+               // CollectProfiles, so the snapshot it returns reflects this 
round's full set
+               // (and drops events the agent has evicted).
+               if s.pressureAggregator != nil {
+                       s.pressureAggregator.FinalizeAgentList(agentID)
+               }
+               s.ackList(payload.ListComplete.RequestId, agentID)
+       }
+}
+
+// collectListTTL bounds how long a list waiter lingers in the registry, 
independent of the
+// aggregator's own wait timeout, so a request whose agents never all ack does 
not leak.
+const collectListTTL = 30 * time.Second
+
+// CollectList sends a list_profiles command (under one request id) to every 
agent and returns a
+// channel closed once each successfully-contacted agent has reported 
ListComplete. Agents that
+// cannot be reached are dropped immediately so they never hold the channel 
open.
+func (s *FODCService) CollectList(agentIDs []string) <-chan struct{} {
+       if len(agentIDs) == 0 {
+               done := make(chan struct{})
+               close(done)
+               return done
+       }
+       requestID := uuid.NewString()
+       waiter := &listWaiter{pending: make(map[string]struct{}), done: 
make(chan struct{})}
+       for _, id := range agentIDs {
+               waiter.pending[id] = struct{}{}
+       }
+
+       s.listMu.Lock()
+       s.listWaiters[requestID] = waiter
+       s.listMu.Unlock()
+       // Bound the registry entry's lifetime and guarantee the waiter 
unblocks even if some agent
+       // never acks, regardless of whether collectListTTL stays above the 
aggregator's timeout.
+       time.AfterFunc(collectListTTL, func() {
+               s.listMu.Lock()
+               delete(s.listWaiters, requestID)
+               s.listMu.Unlock()
+               waiter.forceDone()
+       })
+

Review Comment:
   `CollectList` registers each `listWaiter` in `s.listWaiters` but only 
removes it via the fixed TTL timer. Even when all agents ack immediately, the 
entry (and its timer) linger for up to 30s, so frequent list calls can 
accumulate many waiters/timers unnecessarily. Remove the waiter from the map as 
soon as `waiter.done` closes (and stop the TTL timer) to avoid transient leaks 
under load.



##########
test/e2e-v2/cases/fodc-pressure/e2e-mac.yaml:
##########
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# LOCAL macOS-ADAPTED copy of e2e.yaml. Identical scenario, but drops the 
CI-only
+# tool-install setup steps (install kubectl / install yq / set PATH) because 
those
+# scripts download Linux binaries that cannot execute on macOS. The local 
machine
+# already has darwin kubectl/python3/curl/go on PATH. Do NOT commit this file.

Review Comment:
   This file is committed into the repository, but its header says "Do NOT 
commit this file." That makes the documentation misleading for future 
maintainers and reviewers. Either remove this file from the PR, or update the 
header comment to reflect that it is intentionally kept in-repo for local macOS 
runs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to