wu-sheng commented on code in PR #911:
URL: 
https://github.com/apache/skywalking-banyandb/pull/911#discussion_r2640329613


##########
docs/design/fodc/proxy.md:
##########
@@ -0,0 +1,941 @@
+# FODC Proxy Development Design
+
+## Overview
+
+The FODC Proxy is the central control plane and data aggregator for the First 
Occurrence Data Collection (FODC) infrastructure. It acts as a unified gateway 
that aggregates observability data from multiple FODC Agents (each co-located 
with a BanyanDB node) and exposes ecosystem-friendly interfaces to external 
systems such as Prometheus and other observability platforms.
+
+The Proxy provides:
+
+1. **Agent Management**: Registration, health monitoring, and lifecycle 
management of FODC Agents
+2. **Metrics Aggregation**: Collects and aggregates metrics from all agents 
with enriched metadata
+3. **Cluster Topology**: Maintains an up-to-date view of cluster topology, 
roles, and node states
+4. **Configuration Collection**: Aggregates and exposes node configurations 
for consistency verification
+
+### Responsibilities
+
+**FODC Proxy Core Responsibilities**
+- Accept bi-directional gRPC connections from FODC Agents
+- Register and track agent lifecycle (online/offline, heartbeat monitoring)
+- Aggregate metrics from all agents with node metadata enrichment
+- Maintain cluster topology view based on agent registrations
+- Collect and expose node configurations for audit and consistency checks
+- Expose unified REST/Prometheus-style interfaces for external consumption
+- Provide proxy-level metrics (health, agent count, RPC latency, etc.)
+
+## Component Design
+
+### 1. Proxy Components
+
+#### 1.1 Agent Registry Component
+
+**Purpose**: Manages the lifecycle and state of all connected FODC Agents
+
+##### Core Responsibilities
+
+- **Agent Registration**: Accepts agent registration requests via gRPC
+- **Health Monitoring**: Tracks agent heartbeat and connection status
+- **State Management**: Maintains agent state (online/offline, last heartbeat 
time)
+- **Topology Building**: Aggregates agent registrations into cluster topology 
view
+- **Connection Management**: Handles connection failures, reconnections, and 
cleanup
+
+##### Core Types
+
+**`AgentInfo`**
+```go
+type AgentInfo struct {
+       NodeID            string                    // Unique node identifier
+       NodeRole          databasev1.Role          // Node role (liaison, 
datanode-hot, etc.)
+       PrimaryAddress    Address                  // Primary agent gRPC address
+       SecondaryAddresses map[string]Address      // Secondary addresses with 
names (e.g., "metrics": Address, "gossip": Address)
+       Labels            map[string]string        // Node labels/metadata
+       RegisteredAt      time.Time                // Registration timestamp
+       LastHeartbeat     time.Time               // Last heartbeat timestamp
+       Status            AgentStatus               // Current agent status
+}
+
+type Address struct {
+       IP   string
+       Port int
+}
+
+type AgentStatus string
+
+const (
+       AgentStatusOnline  AgentStatus = "online"
+       AgentStatusOffline AgentStatus = "unconnected"
+)
+```
+
+**`AgentRegistry`**
+```go
+type AgentRegistry struct {
+       agents    map[AgentKey]*AgentInfo      // Map from agent key 
(IP+port+role+labels) to agent info
+       mu        sync.RWMutex                 // Protects agents map
+       logger    *logger.Logger
+       heartbeatTimeout time.Duration        // Timeout for considering agent 
offline
+}
+
+type AgentKey struct {
+       IP     string                 // Primary IP address
+       Port   int                    // Primary port
+       Role   databasev1.Role       // Node role
+       Labels map[string]string      // Node labels (used for key matching)
+}
+```
+
+##### Key Functions
+
+**`RegisterAgent(ctx context.Context, info *AgentInfo) error`**
+- Registers a new agent or updates existing agent information
+- Creates AgentKey from primary IP + port + role + labels
+- Uses AgentKey as the map key (not nodeID)
+- Validates primary address and role
+- Updates topology view
+- Returns error if registration fails
+
+**`UnregisterAgent(key AgentKey) error`**
+- Removes agent from registry using AgentKey
+- Cleans up associated resources
+- Updates topology view
+- Called in the following scenarios:
+  - When agent's registration stream closes (connection lost)
+  - When agent's all streams are closed and connection is terminated
+  - When agent has been offline longer than `--agent-cleanup-timeout` 
(detected by heartbeat health check)
+  - During graceful shutdown or manual agent removal
+  - When agent explicitly requests unregistration via stream
+
+**`UpdateHeartbeat(key AgentKey) error`**
+- Updates last heartbeat timestamp for agent using AgentKey
+- Marks agent as online if it was offline
+- Returns error if agent not found
+
+**`GetAgent(ip string, port int, role databasev1.Role, labels 
map[string]string) (*AgentInfo, error)`**
+- Retrieves agent information by primary IP + port + role + labels
+- Creates AgentKey from the provided parameters
+- Looks up agent in registry using AgentKey
+- Returns error if agent not found
+
+**`ListAgents() []*AgentInfo`**
+- Returns list of all registered agents
+- Thread-safe read operation
+
+**`ListAgentsByRole(role databasev1.Role) []*AgentInfo`**
+- Returns agents filtered by role
+- Useful for role-specific operations
+
+**`CheckAgentHealth() error`**
+- Periodically checks agent health based on heartbeat timeout
+- Marks agents as offline if heartbeat timeout exceeded
+- Continuously runs heartbeat health checks regardless of cleanup timeout
+- Unregisters agents that have been offline longer than 
`--agent-cleanup-timeout`
+- Agents that cannot maintain connection will be removed after the cleanup 
timeout period
+- Returns aggregated health status
+
+##### Configuration Flags
+
+**`--agent-heartbeat-timeout`**
+- **Type**: `duration`
+- **Default**: `30s`
+- **Description**: Timeout for considering an agent offline if no heartbeat 
received
+
+**`--max-agents`**
+- **Type**: `int`
+- **Default**: `1000`
+- **Description**: Maximum number of agents that can be registered
+
+**`--agent-cleanup-timeout`**
+- **Type**: `duration`
+- **Default**: `5m`
+- **Minimum**: Must be greater than `--agent-heartbeat-timeout`
+- **Description**: Timeout for automatically unregistering agents that have 
been offline. Agents that cannot maintain connection will be removed after 
being offline longer than this timeout. The heartbeat health check continues 
running regardless of this timeout. This timeout must be greater than 
`--agent-heartbeat-timeout` to allow for proper health checking.
+
+#### 1.2 gRPC Server Component
+
+**Purpose**: Handles bi-directional gRPC communication with FODC Agents
+
+##### Core Responsibilities
+
+- **Agent Connection Handling**: Accepts and manages gRPC connections from 
agents
+- **Streaming Support**: Supports bi-directional streaming for metrics
+- **Protocol Implementation**: Implements FODC gRPC service protocol
+- **Connection Lifecycle**: Manages connection establishment, maintenance, and 
cleanup
+
+##### Core Types
+
+**`FODCService`** (gRPC Service Implementation)
+```go
+type FODCService struct {
+       registry        *AgentRegistry
+       metricsAggregator *MetricsAggregator
+       logger          *logger.Logger
+}
+
+// Example gRPC service methods (to be defined in proto)
+// RegisterAgent(stream RegisterAgentRequest) (stream RegisterAgentResponse) 
error
+// StreamMetrics(stream StreamMetricsRequest) (stream StreamMetricsResponse) 
error
+```
+
+**`AgentConnection`**
+```go
+type AgentConnection struct {
+       Key         AgentKey              // Agent key (IP+port+role+labels) 
for registry lookup
+       NodeID      string
+       Stream      grpc.ServerStream
+       Context     context.Context
+       Cancel      context.CancelFunc
+       LastActivity time.Time
+}
+```
+
+##### Key Functions
+
+**`RegisterAgent(stream FODCService_RegisterAgentServer) error`**
+- Handles bi-directional agent registration stream
+- Receives registration requests from agent (includes primary address, role, 
labels)
+- Creates AgentKey from primary IP + port + role + labels
+- Validates registration information
+- Registers agent with AgentRegistry using AgentKey
+- Sends registration responses
+- Maintains stream for heartbeat and re-registration
+
+**`StreamMetrics(stream FODCService_StreamMetricsServer) error`**
+- Handles bi-directional metrics streaming
+- Sends metrics requests from Proxy to agent (on-demand collection)
+- Receives metrics from agent at Proxy in response to requests
+- Proxy initiates by sending a metrics request via StreamMetricsResponse when 
an external client queries metrics
+- Agent responds with StreamMetricsRequest containing the collected metrics
+- Manages stream lifecycle
+
+##### Connection Lifecycle Management
+
+**Stream Closure Handling**
+- When a stream closes (due to network error, agent shutdown, or timeout), the 
gRPC server should:
+  1. Detect stream closure via context cancellation or stream error
+  2. Extract agent key (primary IP + port + role + labels) from the connection
+  3. Check if this is the last active stream for the agent
+  4. If all streams are closed, call `AgentRegistry.UnregisterAgent(agentKey)` 
to clean up
+  5. Update topology to reflect agent offline status
+
+**Graceful vs. Ungraceful Disconnection**
+- **Graceful**: Agent sends explicit disconnect message before closing stream 
→ immediate unregistration
+- **Ungraceful**: Stream closes unexpectedly → unregistration happens after 
detecting all streams closed
+- **Heartbeat Timeout**: Agent marked offline by `CheckAgentHealth()` → 
unregistration occurs after being offline longer than 
`--agent-cleanup-timeout`. Heartbeat health checks continue running regardless.
+
+##### Configuration Flags
+
+**`--grpc-listen-addr`**
+- **Type**: `string`
+- **Default**: `:17900`
+- **Description**: gRPC server address where the Proxy listens for agent 
connections
+
+**`--grpc-max-msg-size`**
+- **Type**: `int`
+- **Default**: `4194304` (4MB)
+- **Description**: Maximum message size for gRPC messages
+
+#### 1.3 Metrics Aggregator Component
+
+**Purpose**: Aggregates and enriches metrics from all agents
+
+##### Core Responsibilities
+
+- **On-Demand Metrics Collection**: Requests metrics from agents via gRPC 
streams when external clients query metrics
+- **Metrics Request Coordination**: Coordinates metrics requests to multiple 
agents concurrently
+- **Metadata Enrichment**: Adds node metadata (role, ID, labels) to metrics
+- **Normalization**: Normalizes metric formats and labels across agents
+- **Time Window Filtering**: Filters metrics by time window when requested by 
external clients (agents filter based on Flight Recorder data)
+
+##### Core Types
+
+**`AggregatedMetric`**
+```go
+type AggregatedMetric struct {
+       Name        string                 // Metric name
+       Labels      map[string]string      // Metric labels (including node 
metadata)
+       Value       float64                // Metric value
+       Timestamp   time.Time              // Metric timestamp
+       NodeID      string                 // Source node ID
+       NodeRole    databasev1.Role       // Source node role
+       Description string                 // Metric description/HELP text
+}
+```
+
+**`MetricsAggregator`**
+```go
+type MetricsAggregator struct {
+       registry      *AgentRegistry
+       grpcService   *FODCService        // For requesting metrics from agents
+       mu            sync.RWMutex
+       logger        *logger.Logger
+}
+
+type MetricsFilter struct {
+       NodeIDs []string              // Filter by specific node IDs (empty = 
all nodes)
+       Role    databasev1.Role       // Filter by node role (optional)
+       StartTime *time.Time          // Start time for time window (optional)
+       EndTime   *time.Time          // End time for time window (optional)
+}
+```
+
+##### Key Functions
+
+**`CollectMetricsFromAgents(ctx context.Context, filter *MetricsFilter) 
([]*AggregatedMetric, error)`**
+- Requests metrics from all agents (or filtered agents) when external client 
queries
+- Sends metrics request via StreamMetrics() to each agent with time window 
filter (if specified)
+- Agents retrieve metrics from Flight Recorder (in-memory storage) filtered by 
time window and respond
+- Waits for StreamMetricsRequest responses from agents (with timeout)
+- Identifies agent connection from stream context and looks up AgentKey
+- Enriches metrics with node metadata (IP, port, role, labels) from AgentKey
+- Combines metrics from all agents into a single list
+- Returns aggregated metrics (not stored, only returned)
+- Returns error if collection fails
+
+**`GetLatestMetrics(ctx context.Context) ([]*AggregatedMetric, error)`**
+- Triggers on-demand collection from all agents
+- Calls CollectMetricsFromAgents() with no time filter to get current metrics
+- Returns latest metrics from all agents
+- Used for `/metrics-windows` endpoint without time parameters
+- Returns error if collection fails
+
+**`GetMetricsWindow(ctx context.Context, startTime, endTime time.Time, filter 
*MetricsFilter) ([]*AggregatedMetric, error)`**
+- Triggers on-demand collection from all agents
+- Calls CollectMetricsFromAgents() with time window filter
+- Agents filter metrics from Flight Recorder by the specified time range
+- Returns metrics within specified time range
+- Used for `/metrics-windows` endpoint with time parameters
+- Returns error if collection fails
+
+##### Configuration Flags
+
+*Note: No configuration flags needed for MetricsAggregator since metrics are 
collected on-demand and not stored.*
+
+#### 1.4 HTTP/REST API Server Component
+
+**Purpose**: Exposes REST and Prometheus-style endpoints for external 
consumption
+
+##### Core Responsibilities
+
+- **REST API**: Implements REST endpoints for cluster topology and 
configuration
+- **Prometheus Integration**: Exposes Prometheus-compatible metrics endpoints
+- **Request Handling**: Handles HTTP requests and routes to appropriate 
handlers
+- **Response Formatting**: Formats responses in appropriate formats (JSON, 
Prometheus text)
+
+##### API Endpoints
+
+**`GET /metrics`**
+- Returns latest metrics from all agents (on-demand collection, not stored in 
Proxy)
+- Includes node metadata (role, ID, labels)
+- Format: Prometheus text format
+- Query parameters:
+  - `node_id` (optional): Filter by node ID
+  - `role` (optional): Filter by role (liaison, datanode-hot, etc.)
+- Used by: Prometheus scrapers, monitoring systems
+
+**`GET /metrics-windows`**
+- Returns metrics from all agents (on-demand collection, not stored in Proxy)
+- Includes node metadata (role, ID, labels)
+- Format: Prometheus text format

Review Comment:
   This can't be the Prometheus text format. Because as a window, it has time 
series values for each metric. We need a JSON or YAML format for this values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to