drpmma commented on code in PR #8430:
URL: https://github.com/apache/rocketmq/pull/8430#discussion_r1687528383


##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -44,55 +44,55 @@
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
 
 import static 
org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
 
 public class AsyncTraceDispatcher implements TraceDispatcher {
     private final static Logger log = 
LoggerFactory.getLogger(AsyncTraceDispatcher.class);
     private final static AtomicInteger COUNTER = new AtomicInteger();
     private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000;
+    private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(0);
+    private final int traceInstanceId = INSTANCE_NUM.getAndIncrement();
     private final int queueSize;
     private final int batchSize;
     private final int maxMsgSize;
     private final long pollingTimeMil;
     private final long waitTimeThresholdMil;
     private final DefaultMQProducer traceProducer;
-    private final ThreadPoolExecutor traceExecutor;
-    // The last discard number of log
     private AtomicLong discardCount;
     private Thread worker;
+    private final ThreadPoolExecutor traceExecutor;
+
+    private final int threadNum = Math.max(8, 
Runtime.getRuntime().availableProcessors());
     private final ArrayBlockingQueue<TraceContext> traceContextQueue;
-    private final HashMap<String, TraceDataSegment> taskQueueByTopic;
+    //    private final HashMap<String, TraceDataSegment> taskQueueByTopic;
     private ArrayBlockingQueue<Runnable> appenderQueue;
     private volatile Thread shutDownHook;
     private volatile boolean stopped = false;
     private DefaultMQProducerImpl hostProducer;
     private DefaultMQPushConsumerImpl hostConsumer;
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
-    private String dispatcherId = UUID.randomUUID().toString();
     private volatile String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
     private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
     private String group;
     private Type type;
     private String namespaceV2;
 
-    public AsyncTraceDispatcher(String group, Type type, String 
traceTopicName, RPCHook rpcHook) {
+    public AsyncTraceDispatcher(String group, Type type, int batchSize, String 
traceTopicName, RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
-        this.batchSize = 100;
+        this.batchSize = Math.min(batchSize, 20);

Review Comment:
   It's recommended to add some annotations about `min`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to