This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b36bfbe014 [INLONG-11995][Agent] The Agent supports parallel creation
of Sender connections to the DataProxy, improving creation efficiency (#11996)
b36bfbe014 is described below
commit b36bfbe014f56d303bb097f2e4f9345d7e5b8a4e
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Sep 15 19:10:38 2025 +0800
[INLONG-11995][Agent] The Agent supports parallel creation of Sender
connections to the DataProxy, improving creation efficiency (#11996)
---
.../agent/plugin/sinks/dataproxy/Sender.java | 45 ++++++++++++++--------
1 file changed, 30 insertions(+), 15 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
index 00a69f5a1b..a78230a28a 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
@@ -47,6 +47,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -214,23 +217,33 @@ public class Sender {
proxyClientConfig.setEnableDataCompress(isCompress);
SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" +
sourcePath,
Thread.currentThread().isDaemon());
- boolean hasError = false;
- ProcessResult procResult = null;
+ int ioThreadNum = 100;
+ ExecutorService createExecutor =
Executors.newFixedThreadPool(ioThreadNum);
+ CountDownLatch latch = new CountDownLatch(maxSenderPerGroup);
for (int i = 0; i < maxSenderPerGroup; i++) {
- InLongTcpMsgSender sender = new
InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
- procResult = new ProcessResult();
- if (!sender.start(procResult)) {
- hasError = true;
- break;
- }
- senders.add(sender);
- }
- if (hasError) {
- senders.forEach(sender -> {
- sender.close();
+ createExecutor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ InLongTcpMsgSender sender = new
InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
+ ProcessResult procResult = new ProcessResult();
+ if (!sender.start(procResult)) {
+ return;
+ }
+ synchronized (senders) {
+ senders.add(sender);
+ }
+ } catch (Throwable t) {
+ LOGGER.error(t.getMessage(), t);
+ } finally {
+ latch.countDown();
+ }
+ }
});
- throw new ProxySdkException("Start sender failure, " + procResult);
}
+ latch.await();
+ createExecutor.shutdown();
}
public void sendBatch(SenderMessage message) {
@@ -361,6 +374,7 @@ public class Sender {
private final int retry;
private final SenderMessage message;
private final int msgCnt;
+ private final long sendTime = System.currentTimeMillis();
AgentSenderCallback(SenderMessage message, int retry) {
this.message = message;
@@ -386,7 +400,8 @@ public class Sender {
AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize());
} else {
LOGGER.error("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
- + "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
+ + "error {},duration:{},msgCnt:{},msgSize:{}",
groupId, streamId, taskId, instanceId, dataTime,
+ retry, result, (System.currentTimeMillis() -
sendTime), msgCnt, message.getTotalSize());
getMetricItem(groupId,
streamId).pluginSendFailCount.addAndGet(msgCnt);
putInResendQueue(new AgentSenderCallback(message, retry));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId,
streamId,