This is an automated email from the ASF dual-hosted git repository. wu-sheng pushed a commit to branch fix/oom-when-collector-unreachable in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git
commit be688d2e6dee02c5d984073038b540029febeeba Author: Wu Sheng <[email protected]> AuthorDate: Tue Jun 9 10:24:29 2026 +0800 fix: prevent OOM when SkyWalking collector is unreachable When the collector is down, the trace report loop fired every second and each gRPC failure logged a multi-KB error (with full stack) that piled up in winston's internal stream buffer faster than the transport drained it, eventually exhausting the heap (apache/skywalking#13764). - Skip the report attempt while the gRPC channel is not READY; gRPC-js reconnects with its own backoff, and the bounded buffer is retained instead of failing a stream and dropping data every tick. - Throttle and slim failure logs in the Trace and Heartbeat clients: at most one line per 30s carrying a suppressed count, reduced to the error code/message so no stack is retained. - Fix SW_AGENT_MAX_BUFFER_SIZE / SW_AGENT_TRACE_TIMEOUT env parsing (Number.isSafeInteger on a string is always false, so the overrides were dead code and always fell back to the defaults). Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]> --- src/agent/protocol/grpc/clients/HeartbeatClient.ts | 7 ++-- .../protocol/grpc/clients/TraceReportClient.ts | 21 +++++++--- src/config/AgentConfig.ts | 12 +++--- src/logging/index.ts | 45 ++++++++++++++++++++++ 4 files changed, 70 insertions(+), 15 deletions(-) diff --git a/src/agent/protocol/grpc/clients/HeartbeatClient.ts b/src/agent/protocol/grpc/clients/HeartbeatClient.ts index 91cda52..2c101a6 100755 --- a/src/agent/protocol/grpc/clients/HeartbeatClient.ts +++ b/src/agent/protocol/grpc/clients/HeartbeatClient.ts @@ -21,7 +21,7 @@ import * as grpc from '@grpc/grpc-js'; import { connectivityState } from '@grpc/grpc-js'; import * as packageInfo from '../../../../../package.json'; -import { createLogger } from '../../../../logging'; +import { createLogger, throttled } from '../../../../logging'; import Client from './Client'; import { ManagementServiceClient } from '../../../../proto/management/Management_grpc_pb'; import AuthInterceptor from '../AuthInterceptor'; @@ -31,6 +31,7 @@ import { KeyStringValuePair } from '../../../../proto/common/Common_pb'; import * as os from 'os'; const logger = createLogger(__filename); +const logHeartbeatError = throttled(logger, 'error', 30000); export default class HeartbeatClient implements Client { private readonly managementServiceClient: ManagementServiceClient; @@ -74,12 +75,12 @@ export default class HeartbeatClient implements Client { this.heartbeatTimer = setInterval(() => { this.managementServiceClient.reportInstanceProperties(instanceProperties, AuthInterceptor(), (error, _) => { if (error) { - logger.error('Failed to send heartbeat', error); + logHeartbeatError('Failed to send heartbeat', error); } }); this.managementServiceClient.keepAlive(keepAlivePkg, AuthInterceptor(), (error, _) => { if (error) { - logger.error('Failed to send heartbeat', error); + logHeartbeatError('Failed to send heartbeat', error); } }); }, 20000).unref(); diff --git a/src/agent/protocol/grpc/clients/TraceReportClient.ts b/src/agent/protocol/grpc/clients/TraceReportClient.ts index 8352604..4f5cfe7 100755 --- a/src/agent/protocol/grpc/clients/TraceReportClient.ts +++ b/src/agent/protocol/grpc/clients/TraceReportClient.ts @@ -20,7 +20,7 @@ import config from '../../../../config/AgentConfig'; import * as grpc from '@grpc/grpc-js'; import { connectivityState } from '@grpc/grpc-js'; -import { createLogger } from '../../../../logging'; +import { createLogger, throttled } from '../../../../logging'; import Client from './Client'; import { TraceSegmentReportServiceClient } from '../../../../proto/language-agent/Tracing_grpc_pb'; import AuthInterceptor from '../AuthInterceptor'; @@ -29,6 +29,8 @@ import { emitter } from '../../../../lib/EventEmitter'; import Segment from '../../../../trace/context/Segment'; const logger = createLogger(__filename); +const logReportError = throttled(logger, 'error', 30000); +const logBufferFull = throttled(logger, 'warn', 30000); export default class TraceReportClient implements Client { private readonly reporterClient: TraceSegmentReportServiceClient; @@ -46,10 +48,8 @@ export default class TraceReportClient implements Client { this.segmentFinishedListener = (segment: Segment) => { // Limit buffer size to prevent memory leak during network issues if (this.buffer.length >= config.maxBufferSize) { - logger.warn( - `Trace buffer reached maximum size (${config.maxBufferSize}). ` + - `Discarding oldest segment to prevent memory leak. ` + - `This may indicate network connectivity issues with the collector.` + logBufferFull( + `Trace buffer reached maximum size (${config.maxBufferSize}); discarding oldest segments. The collector at ${config.collectorAddress} is likely unreachable.`, ); this.buffer.shift(); // Remove oldest segment } @@ -75,12 +75,21 @@ export default class TraceReportClient implements Client { return; } + // Collector unreachable: keep the (bounded) buffer and let gRPC reconnect with its own exponential + // backoff, instead of failing a stream every tick and logging an error storm that exhausts the heap. + // The channel keeps trying to connect because `isConnected` polls getConnectivityState(true). + if (!this.isConnected) { + if (callback) callback(); + + return; + } + const stream = this.reporterClient.collect( AuthInterceptor(), { deadline: Date.now() + config.traceTimeout }, (error, _) => { if (error) { - logger.error('Failed to report trace data', error); + logReportError('Failed to report trace data', error); } if (callback) callback(); diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts index ca33587..2ad5905 100644 --- a/src/config/AgentConfig.ts +++ b/src/config/AgentConfig.ts @@ -119,9 +119,9 @@ const _config = { collectorAddress: process.env.SW_AGENT_COLLECTOR_BACKEND_SERVICES || '127.0.0.1:11800', secure: process.env.SW_AGENT_SECURE?.toLowerCase() === 'true', authorization: process.env.SW_AGENT_AUTHENTICATION, - maxBufferSize: Number.isSafeInteger(process.env.SW_AGENT_MAX_BUFFER_SIZE) - ? Number.parseInt(process.env.SW_AGENT_MAX_BUFFER_SIZE as string, 10) - : 1000, + maxBufferSize: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 1000))( + Number.parseInt(process.env.SW_AGENT_MAX_BUFFER_SIZE ?? '', 10), + ), coldEndpoint: process.env.SW_COLD_ENDPOINT?.toLowerCase() === 'true', disablePlugins: process.env.SW_AGENT_DISABLE_PLUGINS || '', ignoreSuffix: process.env.SW_IGNORE_SUFFIX ?? '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg', @@ -137,9 +137,9 @@ const _config = { reDisablePlugins: RegExp(''), // temporary placeholder so Typescript doesn't throw a fit reIgnoreOperation: RegExp(''), reHttpIgnoreMethod: RegExp(''), - traceTimeout: Number.isSafeInteger(process.env.SW_AGENT_TRACE_TIMEOUT) - ? Number.parseInt(process.env.SW_AGENT_TRACE_TIMEOUT as string, 10) - : 10 * 1000, + traceTimeout: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 10 * 1000))( + Number.parseInt(process.env.SW_AGENT_TRACE_TIMEOUT ?? '', 10), + ), }; export default _config; diff --git a/src/logging/index.ts b/src/logging/index.ts index 6ae1240..32db1aa 100644 --- a/src/logging/index.ts +++ b/src/logging/index.ts @@ -73,3 +73,48 @@ export function createLogger(name: string): LoggerLevelAware { return logger as LoggerLevelAware; } + +/** + * Wraps a logger method so it emits at most once per `intervalMs`, no matter how often it is called. + * + * When the SkyWalking backend is unreachable the report/heartbeat loops fail on every tick. Logging each + * failure with the full gRPC error (a multi-KB stack) lets the records accumulate in winston's internal + * stream buffer faster than the transport drains them, eventually exhausting the heap. This collapses a + * storm of identical failures into a single periodic line that carries the suppressed count, and reduces + * an Error to its `code`/`message` so no stack is retained. + */ +export function throttled( + logger: Logger, + level: 'error' | 'warn' | 'info', + intervalMs: number, +): (message: string, error?: unknown) => void { + let lastLoggedAt = 0; + let suppressed = 0; + + return (message, error) => { + const now = Date.now(); + + if (now - lastLoggedAt < intervalMs) { + suppressed += 1; + return; + } + + const meta: Record<string, unknown> = {}; + + if (suppressed > 0) { + meta.suppressed = suppressed; + } + + if (error != null) { + meta.error = error instanceof Error ? error.message : error; + const code = (error as { code?: unknown }).code; + if (code !== undefined) { + meta.code = code; + } + } + + lastLoggedAt = now; + suppressed = 0; + logger[level](message, meta); + }; +}
