This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bd806e9fc [INLONG-10365][Audit] Optimizing TCP sticky packets may 
lead to duplication of audit data  (#10366)
4bd806e9fc is described below

commit 4bd806e9fcd02086ad5719c8ff100eb253bffece
Author: doleyzi <[email protected]>
AuthorDate: Thu Jun 6 20:34:39 2024 +0800

    [INLONG-10365][Audit] Optimizing TCP sticky packets may lead to duplication 
of audit data  (#10366)
    
    * Optimizing TCP sticky packets may lead to duplication of audit data
    
    * Remove useless code
    
    * Added tcp packet return judgment
    
    * update test case
    
    * update test case
    
    * Optimizing TCP sticky packets may lead to duplication of audit data
    
    * use HashSet instead if List
    
    * Use the interface Set instead of the specific implementation
---
 .../org/apache/inlong/audit/AuditReporterImpl.java |  63 ++--
 .../AuditMetric.java}                              |  36 ++-
 .../org/apache/inlong/audit/send/ProxyManager.java |  65 ++++
 .../apache/inlong/audit/send/SenderChannel.java    | 215 ------------
 .../org/apache/inlong/audit/send/SenderGroup.java  | 270 ----------------
 .../apache/inlong/audit/send/SenderHandler.java    | 110 -------
 .../apache/inlong/audit/send/SenderManager.java    | 360 +++++++++------------
 .../org/apache/inlong/audit/util/AuditConfig.java  |  36 +--
 .../java/org/apache/inlong/audit/util/Decoder.java |  57 ----
 .../apache/inlong/audit/util/EventLoopUtil.java    | 118 -------
 .../{SenderResult.java => RequestIdUtils.java}     |  35 +-
 ...enderManagerTest.java => ProxyManagerTest.java} |  42 ++-
 .../apache/inlong/audit/send/SenderGroupTest.java  |  53 ---
 .../inlong/audit/send/SenderManagerTest.java       |  15 +-
 .../inlong/audit/util/RequestIdUtilsTest.java}     |  26 +-
 15 files changed, 338 insertions(+), 1163 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index da3a1144e5..745821b512 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -18,15 +18,18 @@
 package org.apache.inlong.audit;
 
 import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.AuditMetric;
 import org.apache.inlong.audit.entity.FlowType;
 import org.apache.inlong.audit.loader.SocketAddressListLoader;
 import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.send.ProxyManager;
 import org.apache.inlong.audit.send.SenderManager;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.audit.util.AuditDimensions;
 import org.apache.inlong.audit.util.AuditManagerUtils;
 import org.apache.inlong.audit.util.AuditValues;
 import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.RequestIdUtils;
 import org.apache.inlong.audit.util.StatInfo;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -35,12 +38,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -61,7 +64,7 @@ public class AuditReporterImpl implements Serializable {
     private static final int BATCH_NUM = 100;
     private static final int PERIOD = 1000 * 60;
     // Resource isolation key is used in checkpoint, the default value is 0.
-    private static final long DEFAULT_ISOLATE_KEY = 0;
+    public static final long DEFAULT_ISOLATE_KEY = 0;
     private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
     private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> 
preStatMap =
             new ConcurrentHashMap<>();
@@ -69,7 +72,7 @@ public class AuditReporterImpl implements Serializable {
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> 
expiredStatMap =
             new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<Long, List<String>> expiredKeyList = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, HashSet<String>> expiredKeyList = 
new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Long, Long> flushTime = new 
ConcurrentHashMap<>();
     private final Config config = new Config();
     private final ScheduledExecutorService timeoutExecutor = 
Executors.newSingleThreadScheduledExecutor();
@@ -83,6 +86,8 @@ public class AuditReporterImpl implements Serializable {
     private int flushStatThreshold = 100;
     private boolean autoFlush = true;
 
+    private AuditMetric auditMetric = new AuditMetric();
+
     /**
      * Set stat threshold
      *
@@ -116,12 +121,12 @@ public class AuditReporterImpl implements Serializable {
             public void run() {
                 try {
                     loadIpPortList();
+                    checkFlushTime();
                     if (autoFlush) {
                         flush(DEFAULT_ISOLATE_KEY);
                     }
-                    checkFlushTime();
                 } catch (Exception e) {
-                    LOGGER.error(e.getMessage());
+                    LOGGER.error("Audit run has exception!", e);
                 }
             }
 
@@ -190,7 +195,7 @@ public class AuditReporterImpl implements Serializable {
                 init();
                 initialized = true;
             }
-            this.manager.setAuditProxy(ipPortList);
+            ProxyManager.getInstance().setAuditProxy(ipPortList);
         } catch (InterruptedException e) {
             LOGGER.error(e.getMessage());
         } finally {
@@ -285,7 +290,7 @@ public class AuditReporterImpl implements Serializable {
      * Flush audit data by default audit version
      */
     public synchronized void flush() {
-        flush(DEFAULT_AUDIT_VERSION);
+        flush(DEFAULT_ISOLATE_KEY);
     }
 
     /**
@@ -296,11 +301,10 @@ public class AuditReporterImpl implements Serializable {
                 || flushStat.addAndGet(1) > flushStatThreshold) {
             return;
         }
+        long startTime = System.currentTimeMillis();
         LOGGER.info("Audit flush isolate key {} ", isolateKey);
-        manager.clearBuffer();
+        manager.checkFailedData();
         resetStat();
-        LOGGER.info("pre stat map size {} {} {} {}", this.preStatMap.size(), 
this.expiredStatMap.size(),
-                this.summaryStatMap.size(), this.expiredKeyList.size());
 
         summaryExpiredStatMap(isolateKey);
 
@@ -322,7 +326,13 @@ public class AuditReporterImpl implements Serializable {
 
         clearExpiredKey(isolateKey);
 
-        LOGGER.info("Finish report audit data");
+        manager.closeSocket();
+
+        LOGGER.info("Success report {} package, Failed report {} package, 
total {} message, cost: {} ms",
+                auditMetric.getSuccessPack(), auditMetric.getFailedPack(), 
auditMetric.getTotalMsg(),
+                System.currentTimeMillis() - startTime);
+
+        auditMetric.reset();
     }
 
     /**
@@ -331,7 +341,11 @@ public class AuditReporterImpl implements Serializable {
     private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
         AuditApi.BaseCommand.Builder baseCommand = 
AuditApi.BaseCommand.newBuilder();
         
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
-        manager.send(baseCommand.build(), auditRequest);
+        if (manager.send(baseCommand.build(), auditRequest)) {
+            auditMetric.addSuccessPack(1);
+        } else {
+            auditMetric.addFailedPack(1);
+        }
     }
 
     /**
@@ -385,16 +399,14 @@ public class AuditReporterImpl implements Serializable {
      * Summary pre stat map
      */
     private void summaryPreStatMap(long isolateKey, ConcurrentHashMap<String, 
StatInfo> statInfo) {
-        List<String> expiredKeys = 
this.expiredKeyList.computeIfAbsent(isolateKey, k -> new ArrayList<>());
+        Set<String> expiredKeys = 
this.expiredKeyList.computeIfAbsent(isolateKey, k -> new HashSet<>());
 
         for (Map.Entry<String, StatInfo> entry : statInfo.entrySet()) {
             String key = entry.getKey();
             StatInfo value = entry.getValue();
             // If there is no data, enter the list to be eliminated
             if (value.count.get() == 0) {
-                if (!expiredKeys.contains(key)) {
-                    expiredKeys.add(key);
-                }
+                expiredKeys.add(key);
                 continue;
             }
             sumThreadGroup(isolateKey, key, value);
@@ -405,10 +417,10 @@ public class AuditReporterImpl implements Serializable {
      * Clear expired key
      */
     private void clearExpiredKey(long isolateKey) {
-        Iterator<Map.Entry<Long, List<String>>> iterator =
+        Iterator<Map.Entry<Long, HashSet<String>>> iterator =
                 this.expiredKeyList.entrySet().iterator();
         while (iterator.hasNext()) {
-            Map.Entry<Long, List<String>> entry = iterator.next();
+            Map.Entry<Long, HashSet<String>> entry = iterator.next();
             if (entry.getValue().isEmpty()) {
                 LOGGER.info("Remove the key of expired key list: {},isolate 
key: {}", entry.getKey(), isolateKey);
                 iterator.remove();
@@ -451,7 +463,7 @@ public class AuditReporterImpl implements Serializable {
                 .setSdkTs(sdkTime).setPacketId(packageId)
                 .build();
         AuditApi.AuditRequest.Builder requestBuild = 
AuditApi.AuditRequest.newBuilder();
-        
requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
+        requestBuild.setMsgHeader(msgHeader);
         // Process the stat info for all threads
         for (Map.Entry<String, StatInfo> entry : 
summaryStatMap.get(isolateKey).entrySet()) {
             // Entry key order: logTime inlongGroupID inlongStreamID auditID 
auditTag auditVersion
@@ -479,18 +491,23 @@ public class AuditReporterImpl implements Serializable {
             if (dataId++ >= BATCH_NUM) {
                 dataId = 0;
                 packageId++;
-                sendByBaseCommand(requestBuild.build());
-                requestBuild.clearMsgBody();
+                sendData(requestBuild);
             }
         }
 
         if (requestBuild.getMsgBodyCount() > 0) {
-            sendByBaseCommand(requestBuild.build());
-            requestBuild.clearMsgBody();
+            sendData(requestBuild);
         }
         summaryStatMap.get(isolateKey).clear();
     }
 
+    private void sendData(AuditApi.AuditRequest.Builder requestBuild) {
+        requestBuild.setRequestId(RequestIdUtils.nextRequestId());
+        sendByBaseCommand(requestBuild.build());
+        auditMetric.addTotalMsg(requestBuild.getMsgBodyCount());
+        requestBuild.clearMsgBody();
+    }
+
     /**
      * Check flush time
      */
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
similarity index 54%
copy from 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
copy to 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
index 233677ef59..000642bd64 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
@@ -15,24 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.send;
+package org.apache.inlong.audit.entity;
 
-import org.apache.inlong.audit.util.Decoder;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class AuditMetric {
 
-public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
+    private Long successPack = 0L;
+    private Long failedPack = 0L;
+    private Long totalMsg = 0L;
 
-    private SenderManager senderManager;
+    public void addSuccessPack(long successPack) {
+        this.successPack += successPack;
+    }
+
+    public void addFailedPack(long failedPack) {
+        this.failedPack += failedPack;
+    }
 
-    public ClientPipelineFactory(SenderManager senderManager) {
-        this.senderManager = senderManager;
+    public void addTotalMsg(long totalMsg) {
+        this.totalMsg += totalMsg;
     }
 
-    @Override
-    public void initChannel(SocketChannel ch) throws Exception {
-        ch.pipeline().addLast("contentDecoder", new Decoder());
-        ch.pipeline().addLast("handler", new SenderHandler(senderManager));
+    public void reset() {
+        successPack = 0L;
+        failedPack = 0L;
+        totalMsg = 0L;
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
new file mode 100644
index 0000000000..a3dfcd4e75
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class ProxyManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyManager.class);
+    private static final ProxyManager instance = new ProxyManager();
+    private final List<String> currentIpPorts = new CopyOnWriteArrayList<>();
+
+    private ProxyManager() {
+    }
+
+    public static ProxyManager getInstance() {
+        return instance;
+    }
+
+    /**
+     * update config
+     */
+    public void setAuditProxy(HashSet<String> ipPortList) {
+        if (!ipPortList.equals(new HashSet<>(currentIpPorts))) {
+            currentIpPorts.clear();
+            currentIpPorts.addAll(ipPortList);
+        }
+    }
+
+    public InetSocketAddress getInetSocketAddress() {
+        if (currentIpPorts.isEmpty()) {
+            return null;
+        }
+        Random rand = new Random();
+        String randomElement = 
currentIpPorts.get(rand.nextInt(currentIpPorts.size()));
+        String[] ipPort = randomElement.split(":");
+        if (ipPort.length != 2) {
+            LOGGER.error("Invalid IP:Port format: {}", randomElement);
+            return null;
+        }
+        return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
+    }
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
deleted file mode 100644
index 8977221ec7..0000000000
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import org.apache.inlong.audit.util.EventLoopUtil;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.Attribute;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-
-public class SenderChannel {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SenderChannel.class);
-
-    public static final int DEFAULT_SEND_THREADNUM = 1;
-    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
-    public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
-
-    private InetSocketAddress addr;
-    private Channel channel;
-    private String channelKey;
-    private Semaphore packToken;
-    private Bootstrap client;
-    private SenderManager senderManager;
-
-    /**
-     * Constructor
-     *
-     * @param addr
-     */
-    public SenderChannel(InetSocketAddress addr, int maxSynchRequest, 
SenderManager senderManager) {
-        this.addr = addr;
-        this.packToken = new Semaphore(maxSynchRequest);
-        this.senderManager = senderManager;
-    }
-
-    /**
-     * Try acquire channel
-     *
-     * @return
-     */
-    public boolean tryAcquire() {
-        return packToken.tryAcquire();
-    }
-
-    /**
-     * Try acquire channel
-     *
-     * @return
-     */
-    public boolean acquire() {
-        try {
-            packToken.acquire();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        return true;
-    }
-
-    /**
-     * release channel
-     */
-    public void release() {
-        packToken.release();
-    }
-
-    /**
-     * toString
-     */
-    @Override
-    public String toString() {
-        return addr.toString();
-    }
-
-    /**
-     * get addr
-     * @return the addr
-     */
-    public InetSocketAddress getAddr() {
-        return addr;
-    }
-
-    /**
-     * set addr
-     * @param addr the addr to set
-     */
-    public void setAddr(InetSocketAddress addr) {
-        this.addr = addr;
-    }
-
-    /**
-     * get channel
-     *
-     * @return the channel
-     */
-    public Channel getChannel() {
-        return channel;
-    }
-
-    /**
-     * set channel
-     *
-     * @param channel the channel to set
-     */
-    public void setChannel(Channel channel) {
-        this.channel = channel;
-        Attribute<String> attr = this.channel.attr(SenderGroup.CHANNEL_KEY);
-        attr.set(channelKey);
-    }
-
-    /**
-     * get channelKey
-     * @return the channelKey
-     */
-    public String getChannelKey() {
-        return channelKey;
-    }
-
-    /**
-     * set channelKey
-     * @param channelKey the channelKey to set
-     */
-    public void setChannelKey(String channelKey) {
-        this.channelKey = channelKey;
-    }
-
-    private void init() {
-        ThreadFactory selfDefineFactory = new 
DefaultThreadFactory("audit-client-io" + Thread.currentThread().getId(),
-                Thread.currentThread().isDaemon());
-
-        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(DEFAULT_SEND_THREADNUM,
-                false, selfDefineFactory);
-        client = new Bootstrap();
-        client.group(eventLoopGroup);
-        
client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
-        client.option(ChannelOption.SO_KEEPALIVE, true);
-        client.option(ChannelOption.TCP_NODELAY, false);
-        client.option(ChannelOption.SO_REUSEADDR, false);
-        client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
-        client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
-        client.handler(new ClientPipelineFactory(senderManager));
-    }
-
-    /**
-     * connect channel
-     *
-     * @return
-     */
-    public boolean connect() {
-        if (checkConnect(this.channel)) {
-            return true;
-        }
-        try {
-            if (client == null) {
-                init();
-            }
-
-            synchronized (client) {
-                ChannelFuture future = client.connect(this.addr).sync();
-                this.channel = future.channel();
-                Attribute<String> attr = 
this.channel.attr(SenderGroup.CHANNEL_KEY);
-                attr.set(channelKey);
-            }
-        } catch (Throwable e) {
-            LOG.error("connect {} failed. {}", this.getAddr(), e.getMessage());
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * check channeel connect
-     *
-     * @param channel
-     * @return
-     */
-    private boolean checkConnect(Channel channel) {
-        try {
-            if (channel == null) {
-                return false;
-            }
-            if (channel.isWritable() || channel.isOpen() || 
channel.isActive()) {
-                return true;
-            }
-        } catch (Throwable ex) {
-            LOG.error("check connect ex." + ex.getMessage());
-        }
-        return false;
-    }
-}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
deleted file mode 100644
index 40e287bd77..0000000000
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import org.apache.inlong.audit.util.SenderResult;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import org.apache.commons.lang.math.NumberUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class SenderGroup {
-
-    public static final Logger LOG = 
LoggerFactory.getLogger(SenderGroup.class);
-    public static final AttributeKey<String> CHANNEL_KEY = 
AttributeKey.newInstance("channelKey");
-    // maximum number of sending
-    public static final int MAX_SEND_TIMES = 3;
-    public static final int DEFAULT_WAIT_TIMES = 10000;
-    public static final int WAIT_INTERVAL = 1;
-    public static final int DEFAULT_SYNCH_REQUESTS = 1;
-    public static final int RANDOM_MIN = 0;
-
-    private List<LinkedBlockingQueue<SenderChannel>> channelGroups = new 
ArrayList<>();
-    private int mIndex = 0;
-    private List<SenderChannel> deleteChannels = new ArrayList<>();
-    private ConcurrentHashMap<String, SenderChannel> totalChannels = new 
ConcurrentHashMap<>();
-
-    private int waitChannelTimes = DEFAULT_WAIT_TIMES;
-    private int waitChannelIntervalMs = WAIT_INTERVAL;
-    private int maxSynchRequest = DEFAULT_SYNCH_REQUESTS;
-    private boolean hasSendError = false;
-
-    private SenderManager senderManager;
-
-    private AtomicLong channelId = new AtomicLong(0);
-
-    /**
-     * constructor
-     *
-     * @param senderManager
-     */
-    public SenderGroup(SenderManager senderManager) {
-        this.senderManager = senderManager;
-        /*
-         * init add two list for update config
-         */
-        channelGroups.add(new LinkedBlockingQueue<>());
-        channelGroups.add(new LinkedBlockingQueue<>());
-    }
-
-    /**
-     * send data
-     *
-     * @param dataBuf
-     * @return
-     */
-    public SenderResult send(ByteBuf dataBuf) {
-        if (dataBuf == null) {
-            return new SenderResult("dataBuf is null", 0, false);
-        }
-        LinkedBlockingQueue<SenderChannel> channels = 
channelGroups.get(mIndex);
-        SenderChannel channel = null;
-        boolean dataBufReleased = false;
-        try {
-            if (channels.size() <= 0) {
-                LOG.error("channels is empty");
-                dataBuf.release();
-                dataBufReleased = true;
-                return new SenderResult("channels is empty", 0, false);
-            }
-            boolean isOk = false;
-            // tryAcquire
-            for (int tryIndex = 0; tryIndex < MAX_SEND_TIMES; tryIndex++) {
-                for (int i = 0; i < channels.size(); i++) {
-                    channel = channels.poll();
-                    if (channel.tryAcquire()) {
-                        if (channel.connect()) {
-                            isOk = true;
-                            break;
-                        }
-                        channel.release();
-                    }
-                    channels.offer(channel);
-                    channel = null;
-                }
-
-                if (isOk) {
-                    break;
-                }
-
-                try {
-                    Thread.sleep(waitChannelIntervalMs);
-                } catch (Throwable e) {
-                    LOG.error(e.getMessage());
-                }
-            }
-            // acquire
-            if (channel == null) {
-                for (int i = 0; i < channels.size(); i++) {
-                    channel = channels.poll();
-                    if (!channel.connect()) {
-                        channels.offer(channel);
-                        channel = null;
-                        continue;
-                    }
-                    if (channel.acquire()) {
-                        break;
-                    }
-                }
-            }
-            // error
-            if (channel == null) {
-                LOG.error("can not get a channel");
-                dataBuf.release();
-                dataBufReleased = true;
-                return new SenderResult("can not get a channel", 0, false);
-            }
-
-            ChannelFuture t = null;
-            if (channel.getChannel().isWritable()) {
-                t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
-                if (!t.isSuccess()) {
-                    if (!channel.getChannel().isActive()) {
-                        channel.connect();
-                    }
-                    t = 
channel.getChannel().writeAndFlush(dataBuf).sync().await();
-                }
-                dataBufReleased = true;
-            } else {
-                dataBuf.release();
-                dataBufReleased = true;
-            }
-            return new SenderResult(channel.getAddr().getHostString(), 
channel.getAddr().getPort(), t.isSuccess());
-        } catch (Throwable ex) {
-            LOG.error(ex.getMessage(), ex);
-            this.setHasSendError(true);
-            return new SenderResult("127.0.0.1", 0, false);
-        } finally {
-            if (channel != null) {
-                channel.release();
-                channels.offer(channel);
-            }
-            if (!dataBufReleased) {
-                dataBuf.release();
-            }
-        }
-    }
-
-    public void release(Channel channel) {
-        Attribute<String> attr = channel.attr(CHANNEL_KEY);
-        String key = attr.get();
-        if (key == null) {
-            return;
-        }
-        SenderChannel senderChannel = this.totalChannels.get(key);
-        if (senderChannel != null) {
-            senderChannel.release();
-        }
-    }
-
-    /**
-     * update config
-     *
-     * @param ipLists
-     */
-    public void updateConfig(List<String> ipLists) {
-        try {
-            for (SenderChannel dc : deleteChannels) {
-                if (dc.getChannel() != null) {
-                    try {
-                        dc.getChannel().disconnect();
-                        dc.getChannel().close();
-                    } catch (Exception e) {
-                        LOG.error(e.getMessage(), e);
-                    }
-                }
-            }
-            deleteChannels.clear();
-            int newIndex = mIndex ^ 0x01;
-            LinkedBlockingQueue<SenderChannel> newChannels = 
this.channelGroups.get(newIndex);
-            newChannels.clear();
-            List<String> waitingDeleteChannelKey = new 
ArrayList<>(totalChannels.size());
-            waitingDeleteChannelKey.addAll(totalChannels.keySet());
-            for (String ipPort : ipLists) {
-                try {
-                    InetSocketAddress addr = parseAddress(ipPort);
-                    if (addr == null) {
-                        continue;
-                    }
-                    String key = String.valueOf(channelId.getAndIncrement());
-                    SenderChannel channel = new SenderChannel(addr, 
maxSynchRequest, senderManager);
-                    channel.setChannelKey(key);
-                    newChannels.add(channel);
-                    totalChannels.put(key, channel);
-                } catch (Exception e) {
-                    LOG.error(e.getMessage(), e);
-                }
-            }
-
-            waitingDeleteChannelKey.forEach(v -> 
deleteChannels.add(totalChannels.remove(v)));
-            this.mIndex = newIndex;
-        } catch (Throwable e) {
-            LOG.error("Update Sender Ip Failed." + e.getMessage(), e);
-        }
-    }
-
-    /**
-     * parseAddress
-     * 
-     * @param  InetSocketAddress
-     * @return
-     */
-    private static InetSocketAddress parseAddress(String ipPort) {
-        String[] splits = ipPort.split(":");
-        if (splits.length == 2) {
-            String strIp = splits[0];
-            String strPort = splits[1];
-            int port = NumberUtils.toInt(strPort, 0);
-            if (port > 0) {
-                return new InetSocketAddress(strIp, port);
-            }
-        }
-        return null;
-    }
-
-    /**
-     * get hasSendError
-     *
-     * @return the hasSendError
-     */
-    public boolean isHasSendError() {
-        return hasSendError;
-    }
-
-    /**
-     * set hasSendError
-     *
-     * @param hasSendError the hasSendError to set
-     */
-    public void setHasSendError(boolean hasSendError) {
-        this.hasSendError = hasSendError;
-    }
-
-}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
deleted file mode 100644
index c5f5c481e9..0000000000
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderHandler.class);
-    private final SenderManager manager;
-
-    /**
-     * Constructor
-     */
-    public SenderHandler(SenderManager manager) {
-        this.manager = manager;
-    }
-
-    /**
-     * Message Received
-     */
-    @Override
-    public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, 
byte[] e) {
-        try {
-            manager.release(ctx.channel());
-            manager.onMessageReceived(ctx, e);
-        } catch (Throwable ex) {
-            LOGGER.error("channelRead0 error: ", ex);
-        }
-    }
-
-    /**
-     * Caught exception
-     */
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
-        try {
-            manager.release(ctx.channel());
-            manager.onExceptionCaught(ctx, e);
-        } catch (Throwable ex) {
-            LOGGER.error("caught exception: ", ex);
-        }
-    }
-
-    /**
-     * Disconnected channel
-     */
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
-        try {
-            manager.release(ctx.channel());
-            super.channelInactive(ctx);
-        } catch (Throwable ex) {
-            LOGGER.error("channelInactive error: ", ex);
-        }
-    }
-
-    /**
-     * Closed channel
-     */
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
-        try {
-            manager.release(ctx.channel());
-            super.channelUnregistered(ctx);
-        } catch (Throwable ex) {
-            LOGGER.error("channelUnregistered error: ", ex);
-        }
-    }
-
-    /**
-     * parseInetSocketAddress
-     * 
-     * @param  channel
-     * @return
-     */
-    public static InetSocketAddress parseInetSocketAddress(Channel channel) {
-        InetSocketAddress destAddr = null;
-        if (channel.remoteAddress() instanceof InetSocketAddress) {
-            destAddr = (InetSocketAddress) channel.remoteAddress();
-        } else if (channel.remoteAddress() != null) {
-            String sendIp = channel.remoteAddress().toString();
-            destAddr = new InetSocketAddress(sendIp, 0);
-        } else {
-            destAddr = new InetSocketAddress("127.0.0.1", 0);
-        }
-        return destAddr;
-    }
-}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index 6868ba5297..4da90f9f0d 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -20,12 +20,7 @@ package org.apache.inlong.audit.send;
 import org.apache.inlong.audit.protocol.AuditApi;
 import org.apache.inlong.audit.util.AuditConfig;
 import org.apache.inlong.audit.util.AuditData;
-import org.apache.inlong.audit.util.SenderResult;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,147 +28,161 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Audit sender manager
  */
 public class SenderManager {
 
-    public static final Long MAX_REQUEST_ID = 1000000000L;
-    public static final int ALL_CONNECT_CHANNEL = -1;
-    public static final int DEFAULT_CONNECT_CHANNEL = 2;
-    public static final Logger LOG = 
LoggerFactory.getLogger(SenderManager.class);
-    private static final int SEND_INTERVAL_MS = 20;
-    private final SecureRandom sRandom = new 
SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
-    private final AtomicLong requestIdSeq = new AtomicLong(0L);
-    private final ConcurrentHashMap<Long, AuditData> dataMap = new 
ConcurrentHashMap<>();
-    private final LinkedBlockingQueue<Long> requestIdQueue = new 
LinkedBlockingQueue<>();
-
-    private SenderGroup sender;
-    private int maxConnectChannels = ALL_CONNECT_CHANNEL;
-    // IPList
-    private List<String> currentIpPorts = new ArrayList<>();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
+    private static final int SEND_INTERVAL_MS = 100;
+    private final ConcurrentHashMap<Long, AuditData> failedDataMap = new 
ConcurrentHashMap<>();
     private AuditConfig auditConfig;
-    private long lastCheckTime = System.currentTimeMillis();
+    private Socket socket = new Socket();
+    private static final int PACKAGE_HEADER_LEN = 4;
+    private static final int MAX_RESPONSE_LENGTH = 32 * 1024;
 
-    /**
-     * Constructor
-     */
     public SenderManager(AuditConfig config) {
-        this(config, DEFAULT_CONNECT_CHANNEL);
-    }
-
-    /**
-     * Constructor
-     */
-    public SenderManager(AuditConfig config, int maxConnectChannels) {
-        try {
-            this.auditConfig = config;
-            this.maxConnectChannels = maxConnectChannels;
-            this.sender = new SenderGroup(this);
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-        }
+        auditConfig = config;
     }
 
-    /**
-     * update config
-     */
-    public void setAuditProxy(HashSet<String> ipPortList) {
-        if (ipPortList.equals(currentIpPorts) && 
!this.sender.isHasSendError()) {
+    public void closeSocket() {
+        if (socket.isClosed()) {
+            LOGGER.info("Audit socket is already closed");
             return;
         }
-        this.sender.setHasSendError(false);
-        List<String> newIpPorts = new ArrayList<>();
-        newIpPorts.addAll(ipPortList);
-        this.currentIpPorts = newIpPorts;
-        int ipSize = ipPortList.size();
-        int needNewSize;
-        if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || 
this.maxConnectChannels >= ipSize) {
-            needNewSize = ipSize;
-        } else {
-            needNewSize = maxConnectChannels;
+        try {
+            socket.close();
+            LOGGER.info("Audit socket closed successfully");
+        } catch (IOException exception) {
+            LOGGER.error("Error closing audit socket", exception);
         }
+    }
 
-        List<String> updateConfigIpLists = new ArrayList<>();
-        List<String> availableIpLists = new ArrayList<>(ipPortList);
-        for (int i = 0; i < needNewSize; i++) {
-            int availableIpSize = availableIpLists.size();
-            int newIpPortIndex = this.sRandom.nextInt(availableIpSize);
-            String ipPort = availableIpLists.remove(newIpPortIndex);
-            updateConfigIpLists.add(ipPort);
-        }
-        LOG.info("needNewSize:{},updateConfigIpLists:{}", needNewSize, 
updateConfigIpLists);
-        if (updateConfigIpLists.size() > 0) {
-            this.sender.updateConfig(updateConfigIpLists);
+    public boolean checkSocket() {
+        if (socket.isClosed() || !socket.isConnected()) {
+            try {
+                InetSocketAddress inetSocketAddress = 
ProxyManager.getInstance().getInetSocketAddress();
+                if (inetSocketAddress == null) {
+                    LOGGER.error("Audit inet socket address is null!");
+                    return false;
+                }
+                reconnect(inetSocketAddress, auditConfig.getSocketTimeout());
+            } catch (IOException exception) {
+                LOGGER.error("Connect to {} has exception!", 
socket.getInetAddress(), exception);
+                return false;
+            }
         }
+        return socket.isConnected();
     }
 
-    /**
-     * next request id
-     */
-    public Long nextRequestId() {
-        long requestId = requestIdSeq.getAndIncrement();
-        if (requestId > MAX_REQUEST_ID) {
-            requestId = 0L;
-            requestIdSeq.set(requestId);
-        }
-        return requestId;
+    private void reconnect(InetSocketAddress inetSocketAddress, int timeout)
+            throws IOException {
+        socket = new Socket();
+        socket.connect(inetSocketAddress, timeout);
+        socket.setSoTimeout(timeout);
     }
 
     /**
      * Send data with command
      */
-    public void send(AuditApi.BaseCommand baseCommand, AuditApi.AuditRequest 
auditRequest) {
+    public boolean send(AuditApi.BaseCommand baseCommand, 
AuditApi.AuditRequest auditRequest) {
         AuditData data = new AuditData(baseCommand, auditRequest);
-        // cache first
-        Long requestId = baseCommand.getAuditRequest().getRequestId();
-        this.dataMap.putIfAbsent(requestId, data);
-        this.sendData(data.getDataByte());
+        for (int retry = 0; retry < auditConfig.getRetryTimes(); retry++) {
+            if (sendData(data.getDataByte())) {
+                return true;
+            }
+            LOGGER.warn("Failed to send data on attempt {}. Retrying...", 
retry + 1);
+            sleep();
+        }
+
+        LOGGER.error("Failed to send data after {} attempts. Storing data for 
later retry.",
+                auditConfig.getRetryTimes());
+        
failedDataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data);
+        return false;
+    }
+
+    private void readFully(InputStream is, byte[] buffer, int len) throws 
IOException {
+        int bytesRead;
+        int totalBytesRead = 0;
+        while (totalBytesRead < len
+                && (bytesRead = is.read(buffer, totalBytesRead, len - 
totalBytesRead)) != -1) {
+            totalBytesRead += bytesRead;
+        }
     }
 
     /**
      * Send data byte array
      */
-    private void sendData(byte[] data) {
-        if (data == null || data.length <= 0) {
-            LOG.warn("send data is empty!");
-            return;
+    private boolean sendData(byte[] data) {
+        if (data == null || data.length == 0) {
+            LOGGER.warn("Send data is empty!");
+            return false;
         }
-        ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
-        dataBuf.writeBytes(data);
-        SenderResult result = this.sender.send(dataBuf);
-        if (!result.result) {
-            this.sender.setHasSendError(true);
+        if (!checkSocket()) {
+            return false;
+        }
+        try {
+            OutputStream outputStream = socket.getOutputStream();
+            InputStream inputStream = socket.getInputStream();
+
+            outputStream.write(data);
+
+            byte[] header = new byte[PACKAGE_HEADER_LEN];
+            readFully(inputStream, header, PACKAGE_HEADER_LEN);
+
+            int bodyLen = ((header[0] & 0xFF) << 24) |
+                    ((header[1] & 0xFF) << 16) |
+                    ((header[2] & 0xFF) << 8) |
+                    (header[3] & 0xFF);
+
+            if (bodyLen > MAX_RESPONSE_LENGTH) {
+                closeSocket();
+                return false;
+            }
+
+            byte[] body = new byte[bodyLen];
+            readFully(inputStream, body, bodyLen);
+
+            AuditApi.BaseCommand reply = AuditApi.BaseCommand.parseFrom(body);
+            return 
AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(reply.getAuditReply().getRspCode());
+        } catch (IOException exception) {
+            closeSocket();
+            LOGGER.error("Send audit data to proxy has exception!", exception);
+            return false;
         }
     }
 
     /**
      * Clean up the backlog of unsent message packets
      */
-    public void clearBuffer() {
-        LOG.info("Audit failed cache size: {}", this.dataMap.size());
-        for (AuditData data : this.dataMap.values()) {
-            this.sendData(data.getDataByte());
-            this.sleep();
+    public void checkFailedData() {
+        LOGGER.info("Audit failed cache size: {}", failedDataMap.size());
+
+        Iterator<Map.Entry<Long, AuditData>> iterator = 
failedDataMap.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, AuditData> entry = iterator.next();
+            if (sendData(entry.getValue().getDataByte())) {
+                iterator.remove();
+                sleep();
+            }
         }
-        if (this.dataMap.size() == 0) {
+        if (failedDataMap.isEmpty()) {
             checkAuditFile();
         }
-        if (this.dataMap.size() > auditConfig.getMaxCacheRow()) {
-            LOG.info("failed cache size: {}>{}", this.dataMap.size(), 
auditConfig.getMaxCacheRow());
+        if (failedDataMap.size() > auditConfig.getMaxCacheRow()) {
+            LOGGER.info("Failed cache size: {} > {}", failedDataMap.size(), 
auditConfig.getMaxCacheRow());
             writeLocalFile();
-            this.dataMap.clear();
+            failedDataMap.clear();
         }
     }
 
@@ -181,29 +190,37 @@ public class SenderManager {
      * write local file
      */
     private void writeLocalFile() {
+        if (!checkFilePath()) {
+            LOGGER.error("{} is not exist!", auditConfig.getFilePath());
+            return;
+        }
+
+        File file = new File(auditConfig.getDisasterFile());
+
         try {
-            if (!checkFilePath()) {
-                return;
-            }
-            File file = new File(auditConfig.getDisasterFile());
-            if (!file.exists()) {
+            if (file.exists()) {
+                if (file.length() > auditConfig.getMaxFileSize()) {
+                    if (!file.delete()) {
+                        LOGGER.error("Failed to delete file: {}", 
file.getAbsolutePath());
+                        return;
+                    }
+                    LOGGER.info("Deleted file due to exceeding max file size: 
{}", file.getAbsolutePath());
+                }
+            } else {
                 if (!file.createNewFile()) {
-                    LOG.error("create file {} failed", 
auditConfig.getDisasterFile());
+                    LOGGER.error("Failed to create file: {}", 
file.getAbsolutePath());
                     return;
                 }
-                LOG.info("create file {} success", 
auditConfig.getDisasterFile());
+                LOGGER.info("Created file: {}", file.getAbsolutePath());
             }
-            if (file.length() > auditConfig.getMaxFileSize()) {
-                file.delete();
-                return;
+
+            try (FileOutputStream fos = new FileOutputStream(file);
+                    ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(fos)) {
+
+                objectOutputStream.writeObject(failedDataMap);
             }
-            FileOutputStream fos = new FileOutputStream(file);
-            ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(fos);
-            objectOutputStream.writeObject(dataMap);
-            objectOutputStream.close();
-            fos.close();
         } catch (IOException e) {
-            LOG.error("write local file error:{}", e.getMessage(), e);
+            LOGGER.error("Error writing to local file: {}", e.getMessage(), e);
         }
     }
 
@@ -214,9 +231,10 @@ public class SenderManager {
         File file = new File(auditConfig.getFilePath());
         if (!file.exists()) {
             if (!file.mkdirs()) {
+                LOGGER.error("Create file {} failed!", 
auditConfig.getFilePath());
                 return false;
             }
-            LOG.info("create file {} success", auditConfig.getFilePath());
+            LOGGER.info("Create file {} success", auditConfig.getFilePath());
         }
         return true;
     }
@@ -225,27 +243,30 @@ public class SenderManager {
      * check audit file
      */
     private void checkAuditFile() {
-        try {
-            File file = new File(auditConfig.getDisasterFile());
-            if (!file.exists()) {
-                return;
-            }
-            FileInputStream inputStream = new 
FileInputStream(auditConfig.getDisasterFile());
-            ObjectInputStream objectStream = new 
ObjectInputStream(inputStream);
+        File file = new File(auditConfig.getDisasterFile());
+        if (!file.exists()) {
+            return;
+        }
+
+        try (FileInputStream inputStream = new FileInputStream(file);
+                ObjectInputStream objectStream = new 
ObjectInputStream(inputStream)) {
+
             ConcurrentHashMap<Long, AuditData> fileData = 
(ConcurrentHashMap<Long, AuditData>) objectStream
                     .readObject();
+
             for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
-                if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
-                    this.dataMap.putIfAbsent(entry.getKey(), entry.getValue());
+                if (failedDataMap.size() < (auditConfig.getMaxCacheRow() / 2)) 
{
+                    failedDataMap.putIfAbsent(entry.getKey(), 
entry.getValue());
                 }
-                this.sendData(entry.getValue().getDataByte());
-                this.sleep();
+                sendData(entry.getValue().getDataByte());
+                sleep();
             }
-            objectStream.close();
-            inputStream.close();
-            file.delete();
         } catch (IOException | ClassNotFoundException e) {
-            LOG.error("check audit file error:{}", e.getMessage(), e);
+            LOGGER.error("check audit file error:{}", e.getMessage(), e);
+        } finally {
+            if (!file.delete()) {
+                LOGGER.error("Failed to delete file: {}", 
file.getAbsolutePath());
+            }
         }
     }
 
@@ -253,60 +274,7 @@ public class SenderManager {
      * get data map size
      */
     public int getDataMapSize() {
-        return this.dataMap.size();
-    }
-
-    /**
-     * processing return package
-     */
-    public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) {
-        if (null == msg) {
-            return;
-        }
-        try {
-            // Analyze abnormal events
-            AuditApi.BaseCommand baseCommand = 
AuditApi.BaseCommand.parseFrom(msg);
-            // Parse request id
-            Long requestId = baseCommand.getAuditReply().getRequestId();
-            AuditData data = this.dataMap.get(requestId);
-            if (data == null) {
-                LOG.error("Can not find the request id onMessageReceived 
{},message: {}",
-                        requestId, baseCommand.getAuditReply().getMessage());
-                if (LOG.isDebugEnabled()) {
-                    for (Map.Entry<Long, AuditData> entry : 
this.dataMap.entrySet()) {
-                        LOG.debug("Data map key:{}, request id:{}", 
entry.getKey(), requestId);
-                    }
-                }
-                return;
-            }
-            // Check audit-proxy response code
-            LOG.info("Audit-proxy response code: {}", 
baseCommand.getAuditReply().getRspCode());
-            if 
(AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode()))
 {
-                this.dataMap.remove(requestId);
-                return;
-            }
-            LOG.error("Audit-proxy has error response! code={}, message={}",
-                    baseCommand.getAuditReply().getRspCode(), 
baseCommand.getAuditReply().getMessage());
-
-            if (data.increaseResendTimes() < SenderGroup.MAX_SEND_TIMES) {
-                this.sendData(data.getDataByte());
-            }
-        } catch (Throwable ex) {
-            LOG.error("Receive Message exception:{}", ex.getMessage(), ex);
-            this.sender.setHasSendError(true);
-        }
-    }
-
-    /**
-     * Handle the packet return exception
-     */
-    public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) {
-        LOG.error("channel context " + ctx + " occurred exception: ", e);
-        try {
-            this.sender.setHasSendError(true);
-        } catch (Throwable ex) {
-            LOG.error("setHasSendError error:{}", ex.getMessage(), ex);
-        }
+        return this.failedDataMap.size();
     }
 
     /**
@@ -316,7 +284,7 @@ public class SenderManager {
         try {
             Thread.sleep(SEND_INTERVAL_MS);
         } catch (Throwable ex) {
-            LOG.error("sleep error:{}", ex.getMessage(), ex);
+            LOGGER.error("sleep error:{}", ex.getMessage(), ex);
         }
     }
 
@@ -326,18 +294,4 @@ public class SenderManager {
     public void setAuditConfig(AuditConfig config) {
         auditConfig = config;
     }
-
-    public void release(Channel channel) {
-        this.sender.release(channel);
-    }
-
-    /**
-     * get dataMap
-     *
-     * @return the dataMap
-     */
-    public ConcurrentHashMap<Long, AuditData> getDataMap() {
-        return dataMap;
-    }
-
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
index c7ac2f3efa..11f6e09b38 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
@@ -17,20 +17,25 @@
 
 package org.apache.inlong.audit.util;
 
+import lombok.Data;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Data
 public class AuditConfig {
 
     private static final Logger logger = 
LoggerFactory.getLogger(AuditConfig.class);
-    private static String FILE_PATH = "/data/inlong/audit/";
-    private static final int FILE_SIZE = 500 * 1024 * 1024;
-    private static final int MAX_CACHE_ROWS = 2000000;
+    private static String FILE_PATH = "./inLong-audit/";
+    private static final int FILE_SIZE = 100 * 1024 * 1024;
+    private static final int MAX_CACHE_ROWS = 1000000;
     private static final int MIN_CACHE_ROWS = 100;
 
     private String filePath;
     private int maxCacheRow;
     private int maxFileSize = FILE_SIZE;
+    private String disasterFileName = "disaster.data";
+    private int socketTimeout = 30000;
+    private int retryTimes = 2;
 
     public AuditConfig(String filePath, int maxCacheRow) {
         if (filePath == null || filePath.length() == 0) {
@@ -50,33 +55,8 @@ public class AuditConfig {
         this.maxCacheRow = MAX_CACHE_ROWS;
     }
 
-    public void setFilePath(String filePath) {
-        this.filePath = filePath;
-    }
-
-    public void setMaxCacheRow(int maxCacheRow) {
-        this.maxCacheRow = maxCacheRow;
-    }
-
-    public String getFilePath() {
-        return filePath;
-    }
-
-    public int getMaxCacheRow() {
-        return maxCacheRow;
-    }
-
-    public int getMaxFileSize() {
-        return maxFileSize;
-    }
-
-    public void setMaxFileSize(int maxFileSize) {
-        this.maxFileSize = maxFileSize;
-    }
-
     public String getDisasterFile() {
         return filePath + "/" + disasterFileName;
     }
 
-    private String disasterFileName = "disaster.data";
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
deleted file mode 100644
index af15d2a0ad..0000000000
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-
-import java.util.List;
-
-public class Decoder extends MessageToMessageDecoder<ByteBuf> {
-
-    // Maximum return packet size
-    private static final int MAX_RESPONSE_LENGTH = 8 * 1024 * 1024;
-
-    /**
-     * decoding
-     */
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
-            List<Object> out) throws Exception {
-        // Every time you need to read the complete package (that is, read to 
the end of the package),
-        // otherwise only the first one will be parsed correctly,
-        // which will adversely affect the parsing of the subsequent package
-        buffer.markReaderIndex();
-        // Packet composition: 4 bytes length content + ProtocolBuffer content
-        int totalLen = buffer.readInt();
-        // Respond to abnormal channel, interrupt in time to avoid stuck
-        if (totalLen > MAX_RESPONSE_LENGTH) {
-            ctx.channel().close();
-            return;
-        }
-        // If the package is not complete, continue to wait for the return 
package
-        if (buffer.readableBytes() < totalLen) {
-            buffer.resetReaderIndex();
-            return;
-        }
-        byte[] returnBuffer = new byte[totalLen];
-        buffer.readBytes(returnBuffer, 0, totalLen);
-        out.add(returnBuffer);
-    }
-}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/EventLoopUtil.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/EventLoopUtil.java
deleted file mode 100644
index 80e400338f..0000000000
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/EventLoopUtil.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
-import io.netty.channel.epoll.EpollDatagramChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollMode;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.ServerSocketChannel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioDatagramChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.Future;
-
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadFactory;
-
-public class EventLoopUtil {
-
-    public EventLoopUtil() {
-    }
-
-    public static EventLoopGroup newEventLoopGroup(int nThreads, boolean 
enableBusyWait, ThreadFactory threadFactory) {
-        if (!Epoll.isAvailable()) {
-            return new NioEventLoopGroup(nThreads, threadFactory);
-        } else if (!enableBusyWait) {
-            return new EpollEventLoopGroup(nThreads, threadFactory);
-        } else {
-            EpollEventLoopGroup eventLoopGroup = new 
EpollEventLoopGroup(nThreads, threadFactory, () -> {
-                return (selectSupplier, hasTasks) -> {
-                    return -3;
-                };
-            });
-            return eventLoopGroup;
-        }
-    }
-
-    public static Class<? extends SocketChannel> 
getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
-        return eventLoopGroup instanceof EpollEventLoopGroup
-                ? EpollSocketChannel.class
-                : NioSocketChannel.class;
-    }
-
-    public static Class<? extends ServerSocketChannel> 
getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
-        return eventLoopGroup instanceof EpollEventLoopGroup
-                ? EpollServerSocketChannel.class
-                : NioServerSocketChannel.class;
-    }
-
-    public static Class<? extends DatagramChannel> 
getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
-        return eventLoopGroup instanceof EpollEventLoopGroup
-                ? EpollDatagramChannel.class
-                : NioDatagramChannel.class;
-    }
-
-    public static void enableTriggeredMode(ServerBootstrap bootstrap) {
-        if (Epoll.isAvailable()) {
-            bootstrap.childOption(EpollChannelOption.EPOLL_MODE, 
EpollMode.LEVEL_TRIGGERED);
-        }
-
-    }
-
-    public static CompletableFuture<Void> shutdownGracefully(EventLoopGroup 
eventLoopGroup) {
-        return toCompletableFutureVoid(eventLoopGroup.shutdownGracefully());
-    }
-
-    /**
-     * get CompletableFuture by Future
-     *
-     * @param future Future
-     * @return CompletableFuture
-     */
-    public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> 
future) {
-        Objects.requireNonNull(future, "future cannot be null");
-
-        CompletableFuture<Void> adapter = new CompletableFuture<>();
-        if (future.isDone()) {
-            if (future.isSuccess()) {
-                adapter.complete(null);
-            } else {
-                adapter.completeExceptionally(future.cause());
-            }
-        } else {
-            future.addListener(f -> {
-                if (f.isSuccess()) {
-                    adapter.complete(null);
-                } else {
-                    adapter.completeExceptionally(f.cause());
-                }
-            });
-        }
-        return adapter;
-    }
-}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/RequestIdUtils.java
similarity index 59%
rename from 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
rename to 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/RequestIdUtils.java
index 99632002ea..8575e9afca 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/RequestIdUtils.java
@@ -17,33 +17,22 @@
 
 package org.apache.inlong.audit.util;
 
-import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
 
-public class SenderResult {
+public class RequestIdUtils {
 
-    public final InetSocketAddress addr;
-    public boolean result;
+    private static final Long MAX_REQUEST_ID = 1000000000L;
+    private static final AtomicLong requestIdSeq = new AtomicLong(0L);
 
     /**
-     * Constructor
-     *
-     * @param addr
-     * @param result
+     * Next request id
      */
-    public SenderResult(InetSocketAddress addr, boolean result) {
-        this.addr = addr;
-        this.result = result;
-    }
-
-    /**
-     * Constructor
-     *
-     * @param sendIp
-     * @param sendPort
-     * @param result
-     */
-    public SenderResult(String sendIp, int sendPort, boolean result) {
-        this.addr = new InetSocketAddress(sendIp, sendPort);
-        this.result = result;
+    public static Long nextRequestId() {
+        long requestId = requestIdSeq.getAndIncrement();
+        if (requestId > MAX_REQUEST_ID) {
+            requestId = 0L;
+            requestIdSeq.set(requestId);
+        }
+        return requestId;
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/ProxyManagerTest.java
similarity index 52%
copy from 
inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
copy to 
inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/ProxyManagerTest.java
index da936e188e..4fdad0f0c3 100644
--- 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
+++ 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/ProxyManagerTest.java
@@ -17,34 +17,28 @@
 
 package org.apache.inlong.audit.send;
 
-import org.apache.inlong.audit.util.AuditConfig;
-
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-
-public class SenderManagerTest {
-
-    private AuditConfig testConfig = new AuditConfig();
+import java.net.InetSocketAddress;
+import java.util.HashSet;
 
-    @Test
-    public void nextRequestId() {
-        SenderManager testManager = new SenderManager(testConfig);
-        Long requestId = testManager.nextRequestId();
-        assertTrue(requestId == 0);
-
-        requestId = testManager.nextRequestId();
-        assertTrue(requestId == 1);
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-        requestId = testManager.nextRequestId();
-        assertTrue(requestId == 2);
-    }
+public class ProxyManagerTest {
 
     @Test
-    public void clearBuffer() {
-        SenderManager testManager = new SenderManager(testConfig);
-        testManager.clearBuffer();
-        int dataMapSize = testManager.getDataMapSize();
-        assertTrue(dataMapSize == 0);
+    public void testProxyManager() {
+        HashSet<String> ipPortList = new HashSet<>();
+        ipPortList.add("172.0.0.1:10081");
+        ipPortList.add("172.0.0.2:10081");
+        ipPortList.add("172.0.0.3:10081");
+        ipPortList.add("172.0.0.4:10081");
+        ipPortList.add("172.0.0.5:10081");
+        ProxyManager.getInstance().setAuditProxy(ipPortList);
+        InetSocketAddress inetSocketAddress = 
ProxyManager.getInstance().getInetSocketAddress();
+        assertEquals(10081, inetSocketAddress.getPort());
+        
assertTrue(inetSocketAddress.getAddress().getHostAddress().startsWith("172.0.0.")
+                && inetSocketAddress.getAddress().getHostAddress().length() == 
9);
     }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
deleted file mode 100644
index 19d73b7476..0000000000
--- 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import org.apache.inlong.audit.util.AuditConfig;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SenderGroupTest {
-
-    AuditConfig testConfig = new AuditConfig();
-    SenderManager testManager = new SenderManager(testConfig);
-    SenderHandler clientHandler = new 
org.apache.inlong.audit.send.SenderHandler(testManager);
-    SenderGroup sender = new 
org.apache.inlong.audit.send.SenderGroup(testManager);
-
-    @Test
-    public void isHasSendError() {
-        sender.setHasSendError(false);
-        boolean isError = sender.isHasSendError();
-        assertFalse(isError);
-        sender.setHasSendError(true);
-        isError = sender.isHasSendError();
-        assertTrue(isError);
-    }
-
-    @Test
-    public void setHasSendError() {
-        sender.setHasSendError(false);
-        boolean isError = sender.isHasSendError();
-        assertFalse(isError);
-        sender.setHasSendError(true);
-        isError = sender.isHasSendError();
-        assertTrue(isError);
-    }
-}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
index da936e188e..ea53645b4b 100644
--- 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
+++ 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
@@ -27,23 +27,10 @@ public class SenderManagerTest {
 
     private AuditConfig testConfig = new AuditConfig();
 
-    @Test
-    public void nextRequestId() {
-        SenderManager testManager = new SenderManager(testConfig);
-        Long requestId = testManager.nextRequestId();
-        assertTrue(requestId == 0);
-
-        requestId = testManager.nextRequestId();
-        assertTrue(requestId == 1);
-
-        requestId = testManager.nextRequestId();
-        assertTrue(requestId == 2);
-    }
-
     @Test
     public void clearBuffer() {
         SenderManager testManager = new SenderManager(testConfig);
-        testManager.clearBuffer();
+        testManager.checkFailedData();
         int dataMapSize = testManager.getDataMapSize();
         assertTrue(dataMapSize == 0);
     }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/RequestIdUtilsTest.java
similarity index 56%
rename from 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
rename to 
inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/RequestIdUtilsTest.java
index 233677ef59..1a10ef95db 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++ 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/RequestIdUtilsTest.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.send;
+package org.apache.inlong.audit.util;
 
-import org.apache.inlong.audit.util.Decoder;
+import org.junit.Test;
 
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+import static org.junit.Assert.assertTrue;
 
-public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
+public class RequestIdUtilsTest {
 
-    private SenderManager senderManager;
+    @Test
+    public void testNextRequestId() {
 
-    public ClientPipelineFactory(SenderManager senderManager) {
-        this.senderManager = senderManager;
-    }
+        Long requestId = RequestIdUtils.nextRequestId();
+        assertTrue(requestId == 0);
+
+        requestId = RequestIdUtils.nextRequestId();
+        assertTrue(requestId == 1);
 
-    @Override
-    public void initChannel(SocketChannel ch) throws Exception {
-        ch.pipeline().addLast("contentDecoder", new Decoder());
-        ch.pipeline().addLast("handler", new SenderHandler(senderManager));
+        requestId = RequestIdUtils.nextRequestId();
+        assertTrue(requestId == 2);
     }
 }

Reply via email to