This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c9deaa538b [ISSUE #9377] fix 'send trace data can fail if shutdown 
producer immediately' (#9378)
c9deaa538b is described below

commit c9deaa538be891870e40dec2b556207c0bdfb9d0
Author: kaikoo <[email protected]>
AuthorDate: Wed Apr 30 15:15:59 2025 +0800

    [ISSUE #9377] fix 'send trace data can fail if shutdown producer 
immediately' (#9378)
---
 .../org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ece75514e1..c76fea7492 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -54,6 +55,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private static final Logger log = 
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
     private static final AtomicInteger COUNTER = new AtomicInteger();
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(0);
+    private static final long WAIT_FOR_SHUTDOWN = 5000L;
     private volatile boolean stopped = false;
     private final int traceInstanceId = INSTANCE_NUM.getAndIncrement();
     private final int batchNum;
@@ -190,23 +192,19 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
 
     @Override
     public void flush() {
-        long end = System.currentTimeMillis() + 500;
-        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && 
System.currentTimeMillis() <= end) {
+        while (traceContextQueue.size() > 0) {
             try {
                 flushTraceContext(true);
             } catch (Throwable throwable) {
                 log.error("flushTraceContext error", throwable);
             }
         }
-        if (appenderQueue.size() > 0) {
-            log.error("There are still some traces that haven't been sent " + 
traceContextQueue.size() + "   " + appenderQueue.size());
-        }
     }
 
     @Override
     public void shutdown() {
         flush();
-        this.traceExecutor.shutdown();
+        ThreadUtils.shutdownGracefully(this.traceExecutor, WAIT_FOR_SHUTDOWN, 
TimeUnit.MILLISECONDS);
         if (isStarted.get()) {
             traceProducer.shutdown();
         }

Reply via email to