This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c0fec288f [INLONG-12064][Audit] Static ScheduledExecutorService in 
PulsarSink causes ClassLoader leaks and shared state corruption (#12085)
5c0fec288f is described below

commit 5c0fec288fe46155e98e29109f7c0d877217d030
Author: doleyzi <[email protected]>
AuthorDate: Thu Feb 26 17:08:16 2026 +0800

    [INLONG-12064][Audit] Static ScheduledExecutorService in PulsarSink causes 
ClassLoader leaks and shared state corruption (#12085)
    
    Co-authored-by: doleyzi <[email protected]>
---
 .../org/apache/inlong/audit/sink/PulsarSink.java   | 68 +++++++++++++++++-----
 1 file changed, 54 insertions(+), 14 deletions(-)

diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
index c99cb9ce16..c98fca78d1 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -47,6 +47,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -145,22 +147,16 @@ public class PulsarSink extends AbstractSink
 
     private static final PulsarPerformanceTask pulsarPerformanceTask = new 
PulsarPerformanceTask();
 
-    private static ScheduledExecutorService scheduledExecutorService = 
Executors
-            .newScheduledThreadPool(1, new 
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
+    private static volatile ScheduledExecutorService scheduledExecutorService;
+
+    private static final AtomicBoolean schedulerStarted = new 
AtomicBoolean(false);
+
+    private static final AtomicInteger activeInstances = new AtomicInteger(0);
 
     private String topic;
 
     private Context context;
 
-    static {
-        /*
-         * stat pulsar performance
-         */
-        logger.info("init pulsarPerformanceTask");
-        scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 
0L,
-                PRINT_INTERVAL, TimeUnit.SECONDS);
-    }
-
     public PulsarSink() {
         super();
         logger.debug("new instance of PulsarSink!");
@@ -199,6 +195,46 @@ public class PulsarSink extends AbstractSink
         }
     }
 
+    /**
+     * Start the performance scheduler if not already started.
+     * Uses compareAndSet to ensure the scheduler is only initialized once 
across all instances.
+     */
+    private static synchronized void startPerformanceScheduler() {
+        if (!schedulerStarted.compareAndSet(false, true)) {
+            return;
+        }
+        scheduledExecutorService = Executors.newScheduledThreadPool(1,
+                new 
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
+        scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 
PRINT_INTERVAL,
+                PRINT_INTERVAL, TimeUnit.SECONDS);
+        logger.info("PulsarPerformanceTask scheduler started");
+    }
+
+    /**
+     * Stop the performance scheduler when the last active instance is stopped.
+     * Uses reference counting to ensure the scheduler is only shut down
+     * when no more active instances remain.
+     */
+    private static synchronized void stopPerformanceScheduler() {
+        if (activeInstances.get() <= 0) {
+            logger.warn("No active instances to stop, skipping scheduler 
shutdown");
+            return;
+        }
+
+        if (activeInstances.decrementAndGet() > 0) {
+            logger.info("Still have active instances, not shutting down 
scheduler");
+            return;
+        }
+
+        if (scheduledExecutorService != null && 
!scheduledExecutorService.isShutdown()) {
+            logger.info("Shutting down pulsarPerformanceTask scheduler");
+            scheduledExecutorService.shutdownNow();
+            scheduledExecutorService = null;
+        }
+        schedulerStarted.set(false);
+        logger.info("PulsarPerformanceTask scheduler stopped");
+    }
+
     private void initTopic() throws Exception {
         long startTime = System.currentTimeMillis();
         if (topic != null) {
@@ -230,6 +266,10 @@ public class PulsarSink extends AbstractSink
                     + i);
             sinkThreadPool[i].start();
         }
+
+        activeInstances.incrementAndGet();
+        startPerformanceScheduler();
+
         logger.debug("meta sink started");
     }
 
@@ -258,9 +298,9 @@ public class PulsarSink extends AbstractSink
             sinkThreadPool = null;
         }
         super.stop();
-        if (!scheduledExecutorService.isShutdown()) {
-            scheduledExecutorService.shutdown();
-        }
+
+        stopPerformanceScheduler();
+
         sinkCounter.stop();
         logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
     }

Reply via email to