This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b36bfbe014 [INLONG-11995][Agent] The Agent supports parallel creation 
of Sender connections to the DataProxy, improving creation efficiency (#11996)
b36bfbe014 is described below

commit b36bfbe014f56d303bb097f2e4f9345d7e5b8a4e
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Sep 15 19:10:38 2025 +0800

    [INLONG-11995][Agent] The Agent supports parallel creation of Sender 
connections to the DataProxy, improving creation efficiency (#11996)
---
 .../agent/plugin/sinks/dataproxy/Sender.java       | 45 ++++++++++++++--------
 1 file changed, 30 insertions(+), 15 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
index 00a69f5a1b..a78230a28a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
@@ -47,6 +47,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -214,23 +217,33 @@ public class Sender {
         proxyClientConfig.setEnableDataCompress(isCompress);
         SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + 
sourcePath,
                 Thread.currentThread().isDaemon());
-        boolean hasError = false;
-        ProcessResult procResult = null;
+        int ioThreadNum = 100;
+        ExecutorService createExecutor = 
Executors.newFixedThreadPool(ioThreadNum);
+        CountDownLatch latch = new CountDownLatch(maxSenderPerGroup);
         for (int i = 0; i < maxSenderPerGroup; i++) {
-            InLongTcpMsgSender sender = new 
InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
-            procResult = new ProcessResult();
-            if (!sender.start(procResult)) {
-                hasError = true;
-                break;
-            }
-            senders.add(sender);
-        }
-        if (hasError) {
-            senders.forEach(sender -> {
-                sender.close();
+            createExecutor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        InLongTcpMsgSender sender = new 
InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
+                        ProcessResult procResult = new ProcessResult();
+                        if (!sender.start(procResult)) {
+                            return;
+                        }
+                        synchronized (senders) {
+                            senders.add(sender);
+                        }
+                    } catch (Throwable t) {
+                        LOGGER.error(t.getMessage(), t);
+                    } finally {
+                        latch.countDown();
+                    }
+                }
             });
-            throw new ProxySdkException("Start sender failure, " + procResult);
         }
+        latch.await();
+        createExecutor.shutdown();
     }
 
     public void sendBatch(SenderMessage message) {
@@ -361,6 +374,7 @@ public class Sender {
         private final int retry;
         private final SenderMessage message;
         private final int msgCnt;
+        private final long sendTime = System.currentTimeMillis();
 
         AgentSenderCallback(SenderMessage message, int retry) {
             this.message = message;
@@ -386,7 +400,8 @@ public class Sender {
                 
AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize());
             } else {
                 LOGGER.error("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
-                        + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
+                        + "error {},duration:{},msgCnt:{},msgSize:{}", 
groupId, streamId, taskId, instanceId, dataTime,
+                        retry, result, (System.currentTimeMillis() - 
sendTime), msgCnt, message.getTotalSize());
                 getMetricItem(groupId, 
streamId).pluginSendFailCount.addAndGet(msgCnt);
                 putInResendQueue(new AgentSenderCallback(message, retry));
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId, 
streamId,

Reply via email to