This is an automated email from the ASF dual-hosted git repository.

wu-sheng pushed a commit to branch fix/oom-when-collector-unreachable
in repository https://gitbox.apache.org/repos/asf/skywalking-nodejs.git

commit be688d2e6dee02c5d984073038b540029febeeba
Author: Wu Sheng <[email protected]>
AuthorDate: Tue Jun 9 10:24:29 2026 +0800

    fix: prevent OOM when SkyWalking collector is unreachable
    
    When the collector is down, the trace report loop fired every second and
    each gRPC failure logged a multi-KB error (with full stack) that piled up
    in winston's internal stream buffer faster than the transport drained it,
    eventually exhausting the heap (apache/skywalking#13764).
    
    - Skip the report attempt while the gRPC channel is not READY; gRPC-js
      reconnects with its own backoff, and the bounded buffer is retained
      instead of failing a stream and dropping data every tick.
    - Throttle and slim failure logs in the Trace and Heartbeat clients: at
      most one line per 30s carrying a suppressed count, reduced to the error
      code/message so no stack is retained.
    - Fix SW_AGENT_MAX_BUFFER_SIZE / SW_AGENT_TRACE_TIMEOUT env parsing
      (Number.isSafeInteger on a string is always false, so the overrides were
      dead code and always fell back to the defaults).
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---
 src/agent/protocol/grpc/clients/HeartbeatClient.ts |  7 ++--
 .../protocol/grpc/clients/TraceReportClient.ts     | 21 +++++++---
 src/config/AgentConfig.ts                          | 12 +++---
 src/logging/index.ts                               | 45 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/src/agent/protocol/grpc/clients/HeartbeatClient.ts 
b/src/agent/protocol/grpc/clients/HeartbeatClient.ts
index 91cda52..2c101a6 100755
--- a/src/agent/protocol/grpc/clients/HeartbeatClient.ts
+++ b/src/agent/protocol/grpc/clients/HeartbeatClient.ts
@@ -21,7 +21,7 @@ import * as grpc from '@grpc/grpc-js';
 import { connectivityState } from '@grpc/grpc-js';
 
 import * as packageInfo from '../../../../../package.json';
-import { createLogger } from '../../../../logging';
+import { createLogger, throttled } from '../../../../logging';
 import Client from './Client';
 import { ManagementServiceClient } from 
'../../../../proto/management/Management_grpc_pb';
 import AuthInterceptor from '../AuthInterceptor';
@@ -31,6 +31,7 @@ import { KeyStringValuePair } from 
'../../../../proto/common/Common_pb';
 import * as os from 'os';
 
 const logger = createLogger(__filename);
+const logHeartbeatError = throttled(logger, 'error', 30000);
 
 export default class HeartbeatClient implements Client {
   private readonly managementServiceClient: ManagementServiceClient;
@@ -74,12 +75,12 @@ export default class HeartbeatClient implements Client {
     this.heartbeatTimer = setInterval(() => {
       
this.managementServiceClient.reportInstanceProperties(instanceProperties, 
AuthInterceptor(), (error, _) => {
         if (error) {
-          logger.error('Failed to send heartbeat', error);
+          logHeartbeatError('Failed to send heartbeat', error);
         }
       });
       this.managementServiceClient.keepAlive(keepAlivePkg, AuthInterceptor(), 
(error, _) => {
         if (error) {
-          logger.error('Failed to send heartbeat', error);
+          logHeartbeatError('Failed to send heartbeat', error);
         }
       });
     }, 20000).unref();
diff --git a/src/agent/protocol/grpc/clients/TraceReportClient.ts 
b/src/agent/protocol/grpc/clients/TraceReportClient.ts
index 8352604..4f5cfe7 100755
--- a/src/agent/protocol/grpc/clients/TraceReportClient.ts
+++ b/src/agent/protocol/grpc/clients/TraceReportClient.ts
@@ -20,7 +20,7 @@
 import config from '../../../../config/AgentConfig';
 import * as grpc from '@grpc/grpc-js';
 import { connectivityState } from '@grpc/grpc-js';
-import { createLogger } from '../../../../logging';
+import { createLogger, throttled } from '../../../../logging';
 import Client from './Client';
 import { TraceSegmentReportServiceClient } from 
'../../../../proto/language-agent/Tracing_grpc_pb';
 import AuthInterceptor from '../AuthInterceptor';
@@ -29,6 +29,8 @@ import { emitter } from '../../../../lib/EventEmitter';
 import Segment from '../../../../trace/context/Segment';
 
 const logger = createLogger(__filename);
+const logReportError = throttled(logger, 'error', 30000);
+const logBufferFull = throttled(logger, 'warn', 30000);
 
 export default class TraceReportClient implements Client {
   private readonly reporterClient: TraceSegmentReportServiceClient;
@@ -46,10 +48,8 @@ export default class TraceReportClient implements Client {
     this.segmentFinishedListener = (segment: Segment) => {
       // Limit buffer size to prevent memory leak during network issues
       if (this.buffer.length >= config.maxBufferSize) {
-        logger.warn(
-          `Trace buffer reached maximum size (${config.maxBufferSize}). ` +
-          `Discarding oldest segment to prevent memory leak. ` +
-          `This may indicate network connectivity issues with the collector.`
+        logBufferFull(
+          `Trace buffer reached maximum size (${config.maxBufferSize}); 
discarding oldest segments. The collector at ${config.collectorAddress} is 
likely unreachable.`,
         );
         this.buffer.shift(); // Remove oldest segment
       }
@@ -75,12 +75,21 @@ export default class TraceReportClient implements Client {
         return;
       }
 
+      // Collector unreachable: keep the (bounded) buffer and let gRPC 
reconnect with its own exponential
+      // backoff, instead of failing a stream every tick and logging an error 
storm that exhausts the heap.
+      // The channel keeps trying to connect because `isConnected` polls 
getConnectivityState(true).
+      if (!this.isConnected) {
+        if (callback) callback();
+
+        return;
+      }
+
       const stream = this.reporterClient.collect(
         AuthInterceptor(),
         { deadline: Date.now() + config.traceTimeout },
         (error, _) => {
           if (error) {
-            logger.error('Failed to report trace data', error);
+            logReportError('Failed to report trace data', error);
           }
 
           if (callback) callback();
diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts
index ca33587..2ad5905 100644
--- a/src/config/AgentConfig.ts
+++ b/src/config/AgentConfig.ts
@@ -119,9 +119,9 @@ const _config = {
   collectorAddress: process.env.SW_AGENT_COLLECTOR_BACKEND_SERVICES || 
'127.0.0.1:11800',
   secure: process.env.SW_AGENT_SECURE?.toLowerCase() === 'true',
   authorization: process.env.SW_AGENT_AUTHENTICATION,
-  maxBufferSize: Number.isSafeInteger(process.env.SW_AGENT_MAX_BUFFER_SIZE)
-    ? Number.parseInt(process.env.SW_AGENT_MAX_BUFFER_SIZE as string, 10)
-    : 1000,
+  maxBufferSize: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 1000))(
+    Number.parseInt(process.env.SW_AGENT_MAX_BUFFER_SIZE ?? '', 10),
+  ),
   coldEndpoint: process.env.SW_COLD_ENDPOINT?.toLowerCase() === 'true',
   disablePlugins: process.env.SW_AGENT_DISABLE_PLUGINS || '',
   ignoreSuffix: process.env.SW_IGNORE_SUFFIX ?? 
'.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg',
@@ -137,9 +137,9 @@ const _config = {
   reDisablePlugins: RegExp(''), // temporary placeholder so Typescript doesn't 
throw a fit
   reIgnoreOperation: RegExp(''),
   reHttpIgnoreMethod: RegExp(''),
-  traceTimeout: Number.isSafeInteger(process.env.SW_AGENT_TRACE_TIMEOUT)
-    ? Number.parseInt(process.env.SW_AGENT_TRACE_TIMEOUT as string, 10)
-    : 10 * 1000,
+  traceTimeout: ((n) => (Number.isSafeInteger(n) && n > 0 ? n : 10 * 1000))(
+    Number.parseInt(process.env.SW_AGENT_TRACE_TIMEOUT ?? '', 10),
+  ),
 };
 
 export default _config;
diff --git a/src/logging/index.ts b/src/logging/index.ts
index 6ae1240..32db1aa 100644
--- a/src/logging/index.ts
+++ b/src/logging/index.ts
@@ -73,3 +73,48 @@ export function createLogger(name: string): LoggerLevelAware 
{
 
   return logger as LoggerLevelAware;
 }
+
+/**
+ * Wraps a logger method so it emits at most once per `intervalMs`, no matter 
how often it is called.
+ *
+ * When the SkyWalking backend is unreachable the report/heartbeat loops fail 
on every tick. Logging each
+ * failure with the full gRPC error (a multi-KB stack) lets the records 
accumulate in winston's internal
+ * stream buffer faster than the transport drains them, eventually exhausting 
the heap. This collapses a
+ * storm of identical failures into a single periodic line that carries the 
suppressed count, and reduces
+ * an Error to its `code`/`message` so no stack is retained.
+ */
+export function throttled(
+  logger: Logger,
+  level: 'error' | 'warn' | 'info',
+  intervalMs: number,
+): (message: string, error?: unknown) => void {
+  let lastLoggedAt = 0;
+  let suppressed = 0;
+
+  return (message, error) => {
+    const now = Date.now();
+
+    if (now - lastLoggedAt < intervalMs) {
+      suppressed += 1;
+      return;
+    }
+
+    const meta: Record<string, unknown> = {};
+
+    if (suppressed > 0) {
+      meta.suppressed = suppressed;
+    }
+
+    if (error != null) {
+      meta.error = error instanceof Error ? error.message : error;
+      const code = (error as { code?: unknown }).code;
+      if (code !== undefined) {
+        meta.code = code;
+      }
+    }
+
+    lastLoggedAt = now;
+    suppressed = 0;
+    logger[level](message, meta);
+  };
+}

Reply via email to