This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c9deaa538b [ISSUE #9377] fix 'send trace data can fail if shutdown
producer immediately' (#9378)
c9deaa538b is described below
commit c9deaa538be891870e40dec2b556207c0bdfb9d0
Author: kaikoo <[email protected]>
AuthorDate: Wed Apr 30 15:15:59 2025 +0800
[ISSUE #9377] fix 'send trace data can fail if shutdown producer
immediately' (#9378)
---
.../org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ece75514e1..c76fea7492 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
@@ -54,6 +55,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private static final Logger log =
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
private static final AtomicInteger COUNTER = new AtomicInteger();
private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(0);
+ private static final long WAIT_FOR_SHUTDOWN = 5000L;
private volatile boolean stopped = false;
private final int traceInstanceId = INSTANCE_NUM.getAndIncrement();
private final int batchNum;
@@ -190,23 +192,19 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
@Override
public void flush() {
- long end = System.currentTimeMillis() + 500;
- while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 &&
System.currentTimeMillis() <= end) {
+ while (traceContextQueue.size() > 0) {
try {
flushTraceContext(true);
} catch (Throwable throwable) {
log.error("flushTraceContext error", throwable);
}
}
- if (appenderQueue.size() > 0) {
- log.error("There are still some traces that haven't been sent " +
traceContextQueue.size() + " " + appenderQueue.size());
- }
}
@Override
public void shutdown() {
flush();
- this.traceExecutor.shutdown();
+ ThreadUtils.shutdownGracefully(this.traceExecutor, WAIT_FOR_SHUTDOWN,
TimeUnit.MILLISECONDS);
if (isStarted.get()) {
traceProducer.shutdown();
}