This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4bd806e9fc [INLONG-10365][Audit] Optimizing TCP sticky packets may
lead to duplication of audit data (#10366)
4bd806e9fc is described below
commit 4bd806e9fcd02086ad5719c8ff100eb253bffece
Author: doleyzi <[email protected]>
AuthorDate: Thu Jun 6 20:34:39 2024 +0800
[INLONG-10365][Audit] Optimizing TCP sticky packets may lead to duplication
of audit data (#10366)
* Optimizing TCP sticky packets may lead to duplication of audit data
* Remove useless code
* Added tcp packet return judgment
* update test case
* update test case
* Optimizing TCP sticky packets may lead to duplication of audit data
* use HashSet instead if List
* Use the interface Set instead of the specific implementation
---
.../org/apache/inlong/audit/AuditReporterImpl.java | 63 ++--
.../AuditMetric.java} | 36 ++-
.../org/apache/inlong/audit/send/ProxyManager.java | 65 ++++
.../apache/inlong/audit/send/SenderChannel.java | 215 ------------
.../org/apache/inlong/audit/send/SenderGroup.java | 270 ----------------
.../apache/inlong/audit/send/SenderHandler.java | 110 -------
.../apache/inlong/audit/send/SenderManager.java | 360 +++++++++------------
.../org/apache/inlong/audit/util/AuditConfig.java | 36 +--
.../java/org/apache/inlong/audit/util/Decoder.java | 57 ----
.../apache/inlong/audit/util/EventLoopUtil.java | 118 -------
.../{SenderResult.java => RequestIdUtils.java} | 35 +-
...enderManagerTest.java => ProxyManagerTest.java} | 42 ++-
.../apache/inlong/audit/send/SenderGroupTest.java | 53 ---
.../inlong/audit/send/SenderManagerTest.java | 15 +-
.../inlong/audit/util/RequestIdUtilsTest.java} | 26 +-
15 files changed, 338 insertions(+), 1163 deletions(-)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index da3a1144e5..745821b512 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -18,15 +18,18 @@
package org.apache.inlong.audit;
import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.AuditMetric;
import org.apache.inlong.audit.entity.FlowType;
import org.apache.inlong.audit.loader.SocketAddressListLoader;
import org.apache.inlong.audit.protocol.AuditApi;
+import org.apache.inlong.audit.send.ProxyManager;
import org.apache.inlong.audit.send.SenderManager;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.audit.util.AuditDimensions;
import org.apache.inlong.audit.util.AuditManagerUtils;
import org.apache.inlong.audit.util.AuditValues;
import org.apache.inlong.audit.util.Config;
+import org.apache.inlong.audit.util.RequestIdUtils;
import org.apache.inlong.audit.util.StatInfo;
import org.apache.commons.lang3.ClassUtils;
@@ -35,12 +38,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -61,7 +64,7 @@ public class AuditReporterImpl implements Serializable {
private static final int BATCH_NUM = 100;
private static final int PERIOD = 1000 * 60;
// Resource isolation key is used in checkpoint, the default value is 0.
- private static final long DEFAULT_ISOLATE_KEY = 0;
+ public static final long DEFAULT_ISOLATE_KEY = 0;
private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>>
preStatMap =
new ConcurrentHashMap<>();
@@ -69,7 +72,7 @@ public class AuditReporterImpl implements Serializable {
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>>
expiredStatMap =
new ConcurrentHashMap<>();
- private final ConcurrentHashMap<Long, List<String>> expiredKeyList = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, HashSet<String>> expiredKeyList =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Long> flushTime = new
ConcurrentHashMap<>();
private final Config config = new Config();
private final ScheduledExecutorService timeoutExecutor =
Executors.newSingleThreadScheduledExecutor();
@@ -83,6 +86,8 @@ public class AuditReporterImpl implements Serializable {
private int flushStatThreshold = 100;
private boolean autoFlush = true;
+ private AuditMetric auditMetric = new AuditMetric();
+
/**
* Set stat threshold
*
@@ -116,12 +121,12 @@ public class AuditReporterImpl implements Serializable {
public void run() {
try {
loadIpPortList();
+ checkFlushTime();
if (autoFlush) {
flush(DEFAULT_ISOLATE_KEY);
}
- checkFlushTime();
} catch (Exception e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error("Audit run has exception!", e);
}
}
@@ -190,7 +195,7 @@ public class AuditReporterImpl implements Serializable {
init();
initialized = true;
}
- this.manager.setAuditProxy(ipPortList);
+ ProxyManager.getInstance().setAuditProxy(ipPortList);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
} finally {
@@ -285,7 +290,7 @@ public class AuditReporterImpl implements Serializable {
* Flush audit data by default audit version
*/
public synchronized void flush() {
- flush(DEFAULT_AUDIT_VERSION);
+ flush(DEFAULT_ISOLATE_KEY);
}
/**
@@ -296,11 +301,10 @@ public class AuditReporterImpl implements Serializable {
|| flushStat.addAndGet(1) > flushStatThreshold) {
return;
}
+ long startTime = System.currentTimeMillis();
LOGGER.info("Audit flush isolate key {} ", isolateKey);
- manager.clearBuffer();
+ manager.checkFailedData();
resetStat();
- LOGGER.info("pre stat map size {} {} {} {}", this.preStatMap.size(),
this.expiredStatMap.size(),
- this.summaryStatMap.size(), this.expiredKeyList.size());
summaryExpiredStatMap(isolateKey);
@@ -322,7 +326,13 @@ public class AuditReporterImpl implements Serializable {
clearExpiredKey(isolateKey);
- LOGGER.info("Finish report audit data");
+ manager.closeSocket();
+
+ LOGGER.info("Success report {} package, Failed report {} package,
total {} message, cost: {} ms",
+ auditMetric.getSuccessPack(), auditMetric.getFailedPack(),
auditMetric.getTotalMsg(),
+ System.currentTimeMillis() - startTime);
+
+ auditMetric.reset();
}
/**
@@ -331,7 +341,11 @@ public class AuditReporterImpl implements Serializable {
private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
AuditApi.BaseCommand.Builder baseCommand =
AuditApi.BaseCommand.newBuilder();
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
- manager.send(baseCommand.build(), auditRequest);
+ if (manager.send(baseCommand.build(), auditRequest)) {
+ auditMetric.addSuccessPack(1);
+ } else {
+ auditMetric.addFailedPack(1);
+ }
}
/**
@@ -385,16 +399,14 @@ public class AuditReporterImpl implements Serializable {
* Summary pre stat map
*/
private void summaryPreStatMap(long isolateKey, ConcurrentHashMap<String,
StatInfo> statInfo) {
- List<String> expiredKeys =
this.expiredKeyList.computeIfAbsent(isolateKey, k -> new ArrayList<>());
+ Set<String> expiredKeys =
this.expiredKeyList.computeIfAbsent(isolateKey, k -> new HashSet<>());
for (Map.Entry<String, StatInfo> entry : statInfo.entrySet()) {
String key = entry.getKey();
StatInfo value = entry.getValue();
// If there is no data, enter the list to be eliminated
if (value.count.get() == 0) {
- if (!expiredKeys.contains(key)) {
- expiredKeys.add(key);
- }
+ expiredKeys.add(key);
continue;
}
sumThreadGroup(isolateKey, key, value);
@@ -405,10 +417,10 @@ public class AuditReporterImpl implements Serializable {
* Clear expired key
*/
private void clearExpiredKey(long isolateKey) {
- Iterator<Map.Entry<Long, List<String>>> iterator =
+ Iterator<Map.Entry<Long, HashSet<String>>> iterator =
this.expiredKeyList.entrySet().iterator();
while (iterator.hasNext()) {
- Map.Entry<Long, List<String>> entry = iterator.next();
+ Map.Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().isEmpty()) {
LOGGER.info("Remove the key of expired key list: {},isolate
key: {}", entry.getKey(), isolateKey);
iterator.remove();
@@ -451,7 +463,7 @@ public class AuditReporterImpl implements Serializable {
.setSdkTs(sdkTime).setPacketId(packageId)
.build();
AuditApi.AuditRequest.Builder requestBuild =
AuditApi.AuditRequest.newBuilder();
-
requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
+ requestBuild.setMsgHeader(msgHeader);
// Process the stat info for all threads
for (Map.Entry<String, StatInfo> entry :
summaryStatMap.get(isolateKey).entrySet()) {
// Entry key order: logTime inlongGroupID inlongStreamID auditID
auditTag auditVersion
@@ -479,18 +491,23 @@ public class AuditReporterImpl implements Serializable {
if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
- sendByBaseCommand(requestBuild.build());
- requestBuild.clearMsgBody();
+ sendData(requestBuild);
}
}
if (requestBuild.getMsgBodyCount() > 0) {
- sendByBaseCommand(requestBuild.build());
- requestBuild.clearMsgBody();
+ sendData(requestBuild);
}
summaryStatMap.get(isolateKey).clear();
}
+ private void sendData(AuditApi.AuditRequest.Builder requestBuild) {
+ requestBuild.setRequestId(RequestIdUtils.nextRequestId());
+ sendByBaseCommand(requestBuild.build());
+ auditMetric.addTotalMsg(requestBuild.getMsgBodyCount());
+ requestBuild.clearMsgBody();
+ }
+
/**
* Check flush time
*/
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
similarity index 54%
copy from
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
copy to
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
index 233677ef59..000642bd64 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditMetric.java
@@ -15,24 +15,36 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.send;
+package org.apache.inlong.audit.entity;
-import org.apache.inlong.audit.util.Decoder;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class AuditMetric {
-public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
+ private Long successPack = 0L;
+ private Long failedPack = 0L;
+ private Long totalMsg = 0L;
- private SenderManager senderManager;
+ public void addSuccessPack(long successPack) {
+ this.successPack += successPack;
+ }
+
+ public void addFailedPack(long failedPack) {
+ this.failedPack += failedPack;
+ }
- public ClientPipelineFactory(SenderManager senderManager) {
- this.senderManager = senderManager;
+ public void addTotalMsg(long totalMsg) {
+ this.totalMsg += totalMsg;
}
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("contentDecoder", new Decoder());
- ch.pipeline().addLast("handler", new SenderHandler(senderManager));
+ public void reset() {
+ successPack = 0L;
+ failedPack = 0L;
+ totalMsg = 0L;
}
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
new file mode 100644
index 0000000000..a3dfcd4e75
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.send;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class ProxyManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyManager.class);
+ private static final ProxyManager instance = new ProxyManager();
+ private final List<String> currentIpPorts = new CopyOnWriteArrayList<>();
+
+ private ProxyManager() {
+ }
+
+ public static ProxyManager getInstance() {
+ return instance;
+ }
+
+ /**
+ * update config
+ */
+ public void setAuditProxy(HashSet<String> ipPortList) {
+ if (!ipPortList.equals(new HashSet<>(currentIpPorts))) {
+ currentIpPorts.clear();
+ currentIpPorts.addAll(ipPortList);
+ }
+ }
+
+ public InetSocketAddress getInetSocketAddress() {
+ if (currentIpPorts.isEmpty()) {
+ return null;
+ }
+ Random rand = new Random();
+ String randomElement =
currentIpPorts.get(rand.nextInt(currentIpPorts.size()));
+ String[] ipPort = randomElement.split(":");
+ if (ipPort.length != 2) {
+ LOGGER.error("Invalid IP:Port format: {}", randomElement);
+ return null;
+ }
+ return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
deleted file mode 100644
index 8977221ec7..0000000000
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import org.apache.inlong.audit.util.EventLoopUtil;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.Attribute;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-
-public class SenderChannel {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SenderChannel.class);
-
- public static final int DEFAULT_SEND_THREADNUM = 1;
- public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
- public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
-
- private InetSocketAddress addr;
- private Channel channel;
- private String channelKey;
- private Semaphore packToken;
- private Bootstrap client;
- private SenderManager senderManager;
-
- /**
- * Constructor
- *
- * @param addr
- */
- public SenderChannel(InetSocketAddress addr, int maxSynchRequest,
SenderManager senderManager) {
- this.addr = addr;
- this.packToken = new Semaphore(maxSynchRequest);
- this.senderManager = senderManager;
- }
-
- /**
- * Try acquire channel
- *
- * @return
- */
- public boolean tryAcquire() {
- return packToken.tryAcquire();
- }
-
- /**
- * Try acquire channel
- *
- * @return
- */
- public boolean acquire() {
- try {
- packToken.acquire();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return true;
- }
-
- /**
- * release channel
- */
- public void release() {
- packToken.release();
- }
-
- /**
- * toString
- */
- @Override
- public String toString() {
- return addr.toString();
- }
-
- /**
- * get addr
- * @return the addr
- */
- public InetSocketAddress getAddr() {
- return addr;
- }
-
- /**
- * set addr
- * @param addr the addr to set
- */
- public void setAddr(InetSocketAddress addr) {
- this.addr = addr;
- }
-
- /**
- * get channel
- *
- * @return the channel
- */
- public Channel getChannel() {
- return channel;
- }
-
- /**
- * set channel
- *
- * @param channel the channel to set
- */
- public void setChannel(Channel channel) {
- this.channel = channel;
- Attribute<String> attr = this.channel.attr(SenderGroup.CHANNEL_KEY);
- attr.set(channelKey);
- }
-
- /**
- * get channelKey
- * @return the channelKey
- */
- public String getChannelKey() {
- return channelKey;
- }
-
- /**
- * set channelKey
- * @param channelKey the channelKey to set
- */
- public void setChannelKey(String channelKey) {
- this.channelKey = channelKey;
- }
-
- private void init() {
- ThreadFactory selfDefineFactory = new
DefaultThreadFactory("audit-client-io" + Thread.currentThread().getId(),
- Thread.currentThread().isDaemon());
-
- EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(DEFAULT_SEND_THREADNUM,
- false, selfDefineFactory);
- client = new Bootstrap();
- client.group(eventLoopGroup);
-
client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
- client.option(ChannelOption.SO_KEEPALIVE, true);
- client.option(ChannelOption.TCP_NODELAY, false);
- client.option(ChannelOption.SO_REUSEADDR, false);
- client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
- client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
- client.handler(new ClientPipelineFactory(senderManager));
- }
-
- /**
- * connect channel
- *
- * @return
- */
- public boolean connect() {
- if (checkConnect(this.channel)) {
- return true;
- }
- try {
- if (client == null) {
- init();
- }
-
- synchronized (client) {
- ChannelFuture future = client.connect(this.addr).sync();
- this.channel = future.channel();
- Attribute<String> attr =
this.channel.attr(SenderGroup.CHANNEL_KEY);
- attr.set(channelKey);
- }
- } catch (Throwable e) {
- LOG.error("connect {} failed. {}", this.getAddr(), e.getMessage());
- return false;
- }
- return true;
- }
-
- /**
- * check channeel connect
- *
- * @param channel
- * @return
- */
- private boolean checkConnect(Channel channel) {
- try {
- if (channel == null) {
- return false;
- }
- if (channel.isWritable() || channel.isOpen() ||
channel.isActive()) {
- return true;
- }
- } catch (Throwable ex) {
- LOG.error("check connect ex." + ex.getMessage());
- }
- return false;
- }
-}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
deleted file mode 100644
index 40e287bd77..0000000000
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import org.apache.inlong.audit.util.SenderResult;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import org.apache.commons.lang.math.NumberUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class SenderGroup {
-
- public static final Logger LOG =
LoggerFactory.getLogger(SenderGroup.class);
- public static final AttributeKey<String> CHANNEL_KEY =
AttributeKey.newInstance("channelKey");
- // maximum number of sending
- public static final int MAX_SEND_TIMES = 3;
- public static final int DEFAULT_WAIT_TIMES = 10000;
- public static final int WAIT_INTERVAL = 1;
- public static final int DEFAULT_SYNCH_REQUESTS = 1;
- public static final int RANDOM_MIN = 0;
-
- private List<LinkedBlockingQueue<SenderChannel>> channelGroups = new
ArrayList<>();
- private int mIndex = 0;
- private List<SenderChannel> deleteChannels = new ArrayList<>();
- private ConcurrentHashMap<String, SenderChannel> totalChannels = new
ConcurrentHashMap<>();
-
- private int waitChannelTimes = DEFAULT_WAIT_TIMES;
- private int waitChannelIntervalMs = WAIT_INTERVAL;
- private int maxSynchRequest = DEFAULT_SYNCH_REQUESTS;
- private boolean hasSendError = false;
-
- private SenderManager senderManager;
-
- private AtomicLong channelId = new AtomicLong(0);
-
- /**
- * constructor
- *
- * @param senderManager
- */
- public SenderGroup(SenderManager senderManager) {
- this.senderManager = senderManager;
- /*
- * init add two list for update config
- */
- channelGroups.add(new LinkedBlockingQueue<>());
- channelGroups.add(new LinkedBlockingQueue<>());
- }
-
- /**
- * send data
- *
- * @param dataBuf
- * @return
- */
- public SenderResult send(ByteBuf dataBuf) {
- if (dataBuf == null) {
- return new SenderResult("dataBuf is null", 0, false);
- }
- LinkedBlockingQueue<SenderChannel> channels =
channelGroups.get(mIndex);
- SenderChannel channel = null;
- boolean dataBufReleased = false;
- try {
- if (channels.size() <= 0) {
- LOG.error("channels is empty");
- dataBuf.release();
- dataBufReleased = true;
- return new SenderResult("channels is empty", 0, false);
- }
- boolean isOk = false;
- // tryAcquire
- for (int tryIndex = 0; tryIndex < MAX_SEND_TIMES; tryIndex++) {
- for (int i = 0; i < channels.size(); i++) {
- channel = channels.poll();
- if (channel.tryAcquire()) {
- if (channel.connect()) {
- isOk = true;
- break;
- }
- channel.release();
- }
- channels.offer(channel);
- channel = null;
- }
-
- if (isOk) {
- break;
- }
-
- try {
- Thread.sleep(waitChannelIntervalMs);
- } catch (Throwable e) {
- LOG.error(e.getMessage());
- }
- }
- // acquire
- if (channel == null) {
- for (int i = 0; i < channels.size(); i++) {
- channel = channels.poll();
- if (!channel.connect()) {
- channels.offer(channel);
- channel = null;
- continue;
- }
- if (channel.acquire()) {
- break;
- }
- }
- }
- // error
- if (channel == null) {
- LOG.error("can not get a channel");
- dataBuf.release();
- dataBufReleased = true;
- return new SenderResult("can not get a channel", 0, false);
- }
-
- ChannelFuture t = null;
- if (channel.getChannel().isWritable()) {
- t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
- if (!t.isSuccess()) {
- if (!channel.getChannel().isActive()) {
- channel.connect();
- }
- t =
channel.getChannel().writeAndFlush(dataBuf).sync().await();
- }
- dataBufReleased = true;
- } else {
- dataBuf.release();
- dataBufReleased = true;
- }
- return new SenderResult(channel.getAddr().getHostString(),
channel.getAddr().getPort(), t.isSuccess());
- } catch (Throwable ex) {
- LOG.error(ex.getMessage(), ex);
- this.setHasSendError(true);
- return new SenderResult("127.0.0.1", 0, false);
- } finally {
- if (channel != null) {
- channel.release();
- channels.offer(channel);
- }
- if (!dataBufReleased) {
- dataBuf.release();
- }
- }
- }
-
- public void release(Channel channel) {
- Attribute<String> attr = channel.attr(CHANNEL_KEY);
- String key = attr.get();
- if (key == null) {
- return;
- }
- SenderChannel senderChannel = this.totalChannels.get(key);
- if (senderChannel != null) {
- senderChannel.release();
- }
- }
-
- /**
- * update config
- *
- * @param ipLists
- */
- public void updateConfig(List<String> ipLists) {
- try {
- for (SenderChannel dc : deleteChannels) {
- if (dc.getChannel() != null) {
- try {
- dc.getChannel().disconnect();
- dc.getChannel().close();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
- deleteChannels.clear();
- int newIndex = mIndex ^ 0x01;
- LinkedBlockingQueue<SenderChannel> newChannels =
this.channelGroups.get(newIndex);
- newChannels.clear();
- List<String> waitingDeleteChannelKey = new
ArrayList<>(totalChannels.size());
- waitingDeleteChannelKey.addAll(totalChannels.keySet());
- for (String ipPort : ipLists) {
- try {
- InetSocketAddress addr = parseAddress(ipPort);
- if (addr == null) {
- continue;
- }
- String key = String.valueOf(channelId.getAndIncrement());
- SenderChannel channel = new SenderChannel(addr,
maxSynchRequest, senderManager);
- channel.setChannelKey(key);
- newChannels.add(channel);
- totalChannels.put(key, channel);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- waitingDeleteChannelKey.forEach(v ->
deleteChannels.add(totalChannels.remove(v)));
- this.mIndex = newIndex;
- } catch (Throwable e) {
- LOG.error("Update Sender Ip Failed." + e.getMessage(), e);
- }
- }
-
- /**
- * parseAddress
- *
- * @param InetSocketAddress
- * @return
- */
- private static InetSocketAddress parseAddress(String ipPort) {
- String[] splits = ipPort.split(":");
- if (splits.length == 2) {
- String strIp = splits[0];
- String strPort = splits[1];
- int port = NumberUtils.toInt(strPort, 0);
- if (port > 0) {
- return new InetSocketAddress(strIp, port);
- }
- }
- return null;
- }
-
- /**
- * get hasSendError
- *
- * @return the hasSendError
- */
- public boolean isHasSendError() {
- return hasSendError;
- }
-
- /**
- * set hasSendError
- *
- * @param hasSendError the hasSendError to set
- */
- public void setHasSendError(boolean hasSendError) {
- this.hasSendError = hasSendError;
- }
-
-}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
deleted file mode 100644
index c5f5c481e9..0000000000
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-
-public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(SenderHandler.class);
- private final SenderManager manager;
-
- /**
- * Constructor
- */
- public SenderHandler(SenderManager manager) {
- this.manager = manager;
- }
-
- /**
- * Message Received
- */
- @Override
- public void channelRead0(io.netty.channel.ChannelHandlerContext ctx,
byte[] e) {
- try {
- manager.release(ctx.channel());
- manager.onMessageReceived(ctx, e);
- } catch (Throwable ex) {
- LOGGER.error("channelRead0 error: ", ex);
- }
- }
-
- /**
- * Caught exception
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
- try {
- manager.release(ctx.channel());
- manager.onExceptionCaught(ctx, e);
- } catch (Throwable ex) {
- LOGGER.error("caught exception: ", ex);
- }
- }
-
- /**
- * Disconnected channel
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) {
- try {
- manager.release(ctx.channel());
- super.channelInactive(ctx);
- } catch (Throwable ex) {
- LOGGER.error("channelInactive error: ", ex);
- }
- }
-
- /**
- * Closed channel
- */
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws
Exception {
- try {
- manager.release(ctx.channel());
- super.channelUnregistered(ctx);
- } catch (Throwable ex) {
- LOGGER.error("channelUnregistered error: ", ex);
- }
- }
-
- /**
- * parseInetSocketAddress
- *
- * @param channel
- * @return
- */
- public static InetSocketAddress parseInetSocketAddress(Channel channel) {
- InetSocketAddress destAddr = null;
- if (channel.remoteAddress() instanceof InetSocketAddress) {
- destAddr = (InetSocketAddress) channel.remoteAddress();
- } else if (channel.remoteAddress() != null) {
- String sendIp = channel.remoteAddress().toString();
- destAddr = new InetSocketAddress(sendIp, 0);
- } else {
- destAddr = new InetSocketAddress("127.0.0.1", 0);
- }
- return destAddr;
- }
-}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index 6868ba5297..4da90f9f0d 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -20,12 +20,7 @@ package org.apache.inlong.audit.send;
import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.audit.util.AuditData;
-import org.apache.inlong.audit.util.SenderResult;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,147 +28,161 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Audit sender manager
*/
public class SenderManager {
- public static final Long MAX_REQUEST_ID = 1000000000L;
- public static final int ALL_CONNECT_CHANNEL = -1;
- public static final int DEFAULT_CONNECT_CHANNEL = 2;
- public static final Logger LOG =
LoggerFactory.getLogger(SenderManager.class);
- private static final int SEND_INTERVAL_MS = 20;
- private final SecureRandom sRandom = new
SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
- private final AtomicLong requestIdSeq = new AtomicLong(0L);
- private final ConcurrentHashMap<Long, AuditData> dataMap = new
ConcurrentHashMap<>();
- private final LinkedBlockingQueue<Long> requestIdQueue = new
LinkedBlockingQueue<>();
-
- private SenderGroup sender;
- private int maxConnectChannels = ALL_CONNECT_CHANNEL;
- // IPList
- private List<String> currentIpPorts = new ArrayList<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SenderManager.class);
+ private static final int SEND_INTERVAL_MS = 100;
+ private final ConcurrentHashMap<Long, AuditData> failedDataMap = new
ConcurrentHashMap<>();
private AuditConfig auditConfig;
- private long lastCheckTime = System.currentTimeMillis();
+ private Socket socket = new Socket();
+ private static final int PACKAGE_HEADER_LEN = 4;
+ private static final int MAX_RESPONSE_LENGTH = 32 * 1024;
- /**
- * Constructor
- */
public SenderManager(AuditConfig config) {
- this(config, DEFAULT_CONNECT_CHANNEL);
- }
-
- /**
- * Constructor
- */
- public SenderManager(AuditConfig config, int maxConnectChannels) {
- try {
- this.auditConfig = config;
- this.maxConnectChannels = maxConnectChannels;
- this.sender = new SenderGroup(this);
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- }
+ auditConfig = config;
}
- /**
- * update config
- */
- public void setAuditProxy(HashSet<String> ipPortList) {
- if (ipPortList.equals(currentIpPorts) &&
!this.sender.isHasSendError()) {
+ public void closeSocket() {
+ if (socket.isClosed()) {
+ LOGGER.info("Audit socket is already closed");
return;
}
- this.sender.setHasSendError(false);
- List<String> newIpPorts = new ArrayList<>();
- newIpPorts.addAll(ipPortList);
- this.currentIpPorts = newIpPorts;
- int ipSize = ipPortList.size();
- int needNewSize;
- if (this.maxConnectChannels == ALL_CONNECT_CHANNEL ||
this.maxConnectChannels >= ipSize) {
- needNewSize = ipSize;
- } else {
- needNewSize = maxConnectChannels;
+ try {
+ socket.close();
+ LOGGER.info("Audit socket closed successfully");
+ } catch (IOException exception) {
+ LOGGER.error("Error closing audit socket", exception);
}
+ }
- List<String> updateConfigIpLists = new ArrayList<>();
- List<String> availableIpLists = new ArrayList<>(ipPortList);
- for (int i = 0; i < needNewSize; i++) {
- int availableIpSize = availableIpLists.size();
- int newIpPortIndex = this.sRandom.nextInt(availableIpSize);
- String ipPort = availableIpLists.remove(newIpPortIndex);
- updateConfigIpLists.add(ipPort);
- }
- LOG.info("needNewSize:{},updateConfigIpLists:{}", needNewSize,
updateConfigIpLists);
- if (updateConfigIpLists.size() > 0) {
- this.sender.updateConfig(updateConfigIpLists);
+ public boolean checkSocket() {
+ if (socket.isClosed() || !socket.isConnected()) {
+ try {
+ InetSocketAddress inetSocketAddress =
ProxyManager.getInstance().getInetSocketAddress();
+ if (inetSocketAddress == null) {
+ LOGGER.error("Audit inet socket address is null!");
+ return false;
+ }
+ reconnect(inetSocketAddress, auditConfig.getSocketTimeout());
+ } catch (IOException exception) {
+ LOGGER.error("Connect to {} has exception!",
socket.getInetAddress(), exception);
+ return false;
+ }
}
+ return socket.isConnected();
}
- /**
- * next request id
- */
- public Long nextRequestId() {
- long requestId = requestIdSeq.getAndIncrement();
- if (requestId > MAX_REQUEST_ID) {
- requestId = 0L;
- requestIdSeq.set(requestId);
- }
- return requestId;
+ private void reconnect(InetSocketAddress inetSocketAddress, int timeout)
+ throws IOException {
+ socket = new Socket();
+ socket.connect(inetSocketAddress, timeout);
+ socket.setSoTimeout(timeout);
}
/**
* Send data with command
*/
- public void send(AuditApi.BaseCommand baseCommand, AuditApi.AuditRequest
auditRequest) {
+ public boolean send(AuditApi.BaseCommand baseCommand,
AuditApi.AuditRequest auditRequest) {
AuditData data = new AuditData(baseCommand, auditRequest);
- // cache first
- Long requestId = baseCommand.getAuditRequest().getRequestId();
- this.dataMap.putIfAbsent(requestId, data);
- this.sendData(data.getDataByte());
+ for (int retry = 0; retry < auditConfig.getRetryTimes(); retry++) {
+ if (sendData(data.getDataByte())) {
+ return true;
+ }
+ LOGGER.warn("Failed to send data on attempt {}. Retrying...",
retry + 1);
+ sleep();
+ }
+
+ LOGGER.error("Failed to send data after {} attempts. Storing data for
later retry.",
+ auditConfig.getRetryTimes());
+
failedDataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data);
+ return false;
+ }
+
+ private void readFully(InputStream is, byte[] buffer, int len) throws
IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ while (totalBytesRead < len
+ && (bytesRead = is.read(buffer, totalBytesRead, len -
totalBytesRead)) != -1) {
+ totalBytesRead += bytesRead;
+ }
}
/**
* Send data byte array
*/
- private void sendData(byte[] data) {
- if (data == null || data.length <= 0) {
- LOG.warn("send data is empty!");
- return;
+ private boolean sendData(byte[] data) {
+ if (data == null || data.length == 0) {
+ LOGGER.warn("Send data is empty!");
+ return false;
}
- ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
- dataBuf.writeBytes(data);
- SenderResult result = this.sender.send(dataBuf);
- if (!result.result) {
- this.sender.setHasSendError(true);
+ if (!checkSocket()) {
+ return false;
+ }
+ try {
+ OutputStream outputStream = socket.getOutputStream();
+ InputStream inputStream = socket.getInputStream();
+
+ outputStream.write(data);
+
+ byte[] header = new byte[PACKAGE_HEADER_LEN];
+ readFully(inputStream, header, PACKAGE_HEADER_LEN);
+
+ int bodyLen = ((header[0] & 0xFF) << 24) |
+ ((header[1] & 0xFF) << 16) |
+ ((header[2] & 0xFF) << 8) |
+ (header[3] & 0xFF);
+
+ if (bodyLen > MAX_RESPONSE_LENGTH) {
+ closeSocket();
+ return false;
+ }
+
+ byte[] body = new byte[bodyLen];
+ readFully(inputStream, body, bodyLen);
+
+ AuditApi.BaseCommand reply = AuditApi.BaseCommand.parseFrom(body);
+ return
AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(reply.getAuditReply().getRspCode());
+ } catch (IOException exception) {
+ closeSocket();
+ LOGGER.error("Send audit data to proxy has exception!", exception);
+ return false;
}
}
/**
* Clean up the backlog of unsent message packets
*/
- public void clearBuffer() {
- LOG.info("Audit failed cache size: {}", this.dataMap.size());
- for (AuditData data : this.dataMap.values()) {
- this.sendData(data.getDataByte());
- this.sleep();
+ public void checkFailedData() {
+ LOGGER.info("Audit failed cache size: {}", failedDataMap.size());
+
+ Iterator<Map.Entry<Long, AuditData>> iterator =
failedDataMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, AuditData> entry = iterator.next();
+ if (sendData(entry.getValue().getDataByte())) {
+ iterator.remove();
+ sleep();
+ }
}
- if (this.dataMap.size() == 0) {
+ if (failedDataMap.isEmpty()) {
checkAuditFile();
}
- if (this.dataMap.size() > auditConfig.getMaxCacheRow()) {
- LOG.info("failed cache size: {}>{}", this.dataMap.size(),
auditConfig.getMaxCacheRow());
+ if (failedDataMap.size() > auditConfig.getMaxCacheRow()) {
+ LOGGER.info("Failed cache size: {} > {}", failedDataMap.size(),
auditConfig.getMaxCacheRow());
writeLocalFile();
- this.dataMap.clear();
+ failedDataMap.clear();
}
}
@@ -181,29 +190,37 @@ public class SenderManager {
* write local file
*/
private void writeLocalFile() {
+ if (!checkFilePath()) {
+ LOGGER.error("{} is not exist!", auditConfig.getFilePath());
+ return;
+ }
+
+ File file = new File(auditConfig.getDisasterFile());
+
try {
- if (!checkFilePath()) {
- return;
- }
- File file = new File(auditConfig.getDisasterFile());
- if (!file.exists()) {
+ if (file.exists()) {
+ if (file.length() > auditConfig.getMaxFileSize()) {
+ if (!file.delete()) {
+ LOGGER.error("Failed to delete file: {}",
file.getAbsolutePath());
+ return;
+ }
+ LOGGER.info("Deleted file due to exceeding max file size:
{}", file.getAbsolutePath());
+ }
+ } else {
if (!file.createNewFile()) {
- LOG.error("create file {} failed",
auditConfig.getDisasterFile());
+ LOGGER.error("Failed to create file: {}",
file.getAbsolutePath());
return;
}
- LOG.info("create file {} success",
auditConfig.getDisasterFile());
+ LOGGER.info("Created file: {}", file.getAbsolutePath());
}
- if (file.length() > auditConfig.getMaxFileSize()) {
- file.delete();
- return;
+
+ try (FileOutputStream fos = new FileOutputStream(file);
+ ObjectOutputStream objectOutputStream = new
ObjectOutputStream(fos)) {
+
+ objectOutputStream.writeObject(failedDataMap);
}
- FileOutputStream fos = new FileOutputStream(file);
- ObjectOutputStream objectOutputStream = new
ObjectOutputStream(fos);
- objectOutputStream.writeObject(dataMap);
- objectOutputStream.close();
- fos.close();
} catch (IOException e) {
- LOG.error("write local file error:{}", e.getMessage(), e);
+ LOGGER.error("Error writing to local file: {}", e.getMessage(), e);
}
}
@@ -214,9 +231,10 @@ public class SenderManager {
File file = new File(auditConfig.getFilePath());
if (!file.exists()) {
if (!file.mkdirs()) {
+ LOGGER.error("Create file {} failed!",
auditConfig.getFilePath());
return false;
}
- LOG.info("create file {} success", auditConfig.getFilePath());
+ LOGGER.info("Create file {} success", auditConfig.getFilePath());
}
return true;
}
@@ -225,27 +243,30 @@ public class SenderManager {
* check audit file
*/
private void checkAuditFile() {
- try {
- File file = new File(auditConfig.getDisasterFile());
- if (!file.exists()) {
- return;
- }
- FileInputStream inputStream = new
FileInputStream(auditConfig.getDisasterFile());
- ObjectInputStream objectStream = new
ObjectInputStream(inputStream);
+ File file = new File(auditConfig.getDisasterFile());
+ if (!file.exists()) {
+ return;
+ }
+
+ try (FileInputStream inputStream = new FileInputStream(file);
+ ObjectInputStream objectStream = new
ObjectInputStream(inputStream)) {
+
ConcurrentHashMap<Long, AuditData> fileData =
(ConcurrentHashMap<Long, AuditData>) objectStream
.readObject();
+
for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
- if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
- this.dataMap.putIfAbsent(entry.getKey(), entry.getValue());
+ if (failedDataMap.size() < (auditConfig.getMaxCacheRow() / 2))
{
+ failedDataMap.putIfAbsent(entry.getKey(),
entry.getValue());
}
- this.sendData(entry.getValue().getDataByte());
- this.sleep();
+ sendData(entry.getValue().getDataByte());
+ sleep();
}
- objectStream.close();
- inputStream.close();
- file.delete();
} catch (IOException | ClassNotFoundException e) {
- LOG.error("check audit file error:{}", e.getMessage(), e);
+ LOGGER.error("check audit file error:{}", e.getMessage(), e);
+ } finally {
+ if (!file.delete()) {
+ LOGGER.error("Failed to delete file: {}",
file.getAbsolutePath());
+ }
}
}
@@ -253,60 +274,7 @@ public class SenderManager {
* get data map size
*/
public int getDataMapSize() {
- return this.dataMap.size();
- }
-
- /**
- * processing return package
- */
- public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) {
- if (null == msg) {
- return;
- }
- try {
- // Analyze abnormal events
- AuditApi.BaseCommand baseCommand =
AuditApi.BaseCommand.parseFrom(msg);
- // Parse request id
- Long requestId = baseCommand.getAuditReply().getRequestId();
- AuditData data = this.dataMap.get(requestId);
- if (data == null) {
- LOG.error("Can not find the request id onMessageReceived
{},message: {}",
- requestId, baseCommand.getAuditReply().getMessage());
- if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, AuditData> entry :
this.dataMap.entrySet()) {
- LOG.debug("Data map key:{}, request id:{}",
entry.getKey(), requestId);
- }
- }
- return;
- }
- // Check audit-proxy response code
- LOG.info("Audit-proxy response code: {}",
baseCommand.getAuditReply().getRspCode());
- if
(AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode()))
{
- this.dataMap.remove(requestId);
- return;
- }
- LOG.error("Audit-proxy has error response! code={}, message={}",
- baseCommand.getAuditReply().getRspCode(),
baseCommand.getAuditReply().getMessage());
-
- if (data.increaseResendTimes() < SenderGroup.MAX_SEND_TIMES) {
- this.sendData(data.getDataByte());
- }
- } catch (Throwable ex) {
- LOG.error("Receive Message exception:{}", ex.getMessage(), ex);
- this.sender.setHasSendError(true);
- }
- }
-
- /**
- * Handle the packet return exception
- */
- public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) {
- LOG.error("channel context " + ctx + " occurred exception: ", e);
- try {
- this.sender.setHasSendError(true);
- } catch (Throwable ex) {
- LOG.error("setHasSendError error:{}", ex.getMessage(), ex);
- }
+ return this.failedDataMap.size();
}
/**
@@ -316,7 +284,7 @@ public class SenderManager {
try {
Thread.sleep(SEND_INTERVAL_MS);
} catch (Throwable ex) {
- LOG.error("sleep error:{}", ex.getMessage(), ex);
+ LOGGER.error("sleep error:{}", ex.getMessage(), ex);
}
}
@@ -326,18 +294,4 @@ public class SenderManager {
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
}
-
- public void release(Channel channel) {
- this.sender.release(channel);
- }
-
- /**
- * get dataMap
- *
- * @return the dataMap
- */
- public ConcurrentHashMap<Long, AuditData> getDataMap() {
- return dataMap;
- }
-
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
index c7ac2f3efa..11f6e09b38 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditConfig.java
@@ -17,20 +17,25 @@
package org.apache.inlong.audit.util;
+import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Data
public class AuditConfig {
private static final Logger logger =
LoggerFactory.getLogger(AuditConfig.class);
- private static String FILE_PATH = "/data/inlong/audit/";
- private static final int FILE_SIZE = 500 * 1024 * 1024;
- private static final int MAX_CACHE_ROWS = 2000000;
+ private static String FILE_PATH = "./inLong-audit/";
+ private static final int FILE_SIZE = 100 * 1024 * 1024;
+ private static final int MAX_CACHE_ROWS = 1000000;
private static final int MIN_CACHE_ROWS = 100;
private String filePath;
private int maxCacheRow;
private int maxFileSize = FILE_SIZE;
+ private String disasterFileName = "disaster.data";
+ private int socketTimeout = 30000;
+ private int retryTimes = 2;
public AuditConfig(String filePath, int maxCacheRow) {
if (filePath == null || filePath.length() == 0) {
@@ -50,33 +55,8 @@ public class AuditConfig {
this.maxCacheRow = MAX_CACHE_ROWS;
}
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
-
- public void setMaxCacheRow(int maxCacheRow) {
- this.maxCacheRow = maxCacheRow;
- }
-
- public String getFilePath() {
- return filePath;
- }
-
- public int getMaxCacheRow() {
- return maxCacheRow;
- }
-
- public int getMaxFileSize() {
- return maxFileSize;
- }
-
- public void setMaxFileSize(int maxFileSize) {
- this.maxFileSize = maxFileSize;
- }
-
public String getDisasterFile() {
return filePath + "/" + disasterFileName;
}
- private String disasterFileName = "disaster.data";
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
deleted file mode 100644
index af15d2a0ad..0000000000
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Decoder.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-
-import java.util.List;
-
-public class Decoder extends MessageToMessageDecoder<ByteBuf> {
-
- // Maximum return packet size
- private static final int MAX_RESPONSE_LENGTH = 8 * 1024 * 1024;
-
- /**
- * decoding
- */
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
- List<Object> out) throws Exception {
- // Every time you need to read the complete package (that is, read to
the end of the package),
- // otherwise only the first one will be parsed correctly,
- // which will adversely affect the parsing of the subsequent package
- buffer.markReaderIndex();
- // Packet composition: 4 bytes length content + ProtocolBuffer content
- int totalLen = buffer.readInt();
- // Respond to abnormal channel, interrupt in time to avoid stuck
- if (totalLen > MAX_RESPONSE_LENGTH) {
- ctx.channel().close();
- return;
- }
- // If the package is not complete, continue to wait for the return
package
- if (buffer.readableBytes() < totalLen) {
- buffer.resetReaderIndex();
- return;
- }
- byte[] returnBuffer = new byte[totalLen];
- buffer.readBytes(returnBuffer, 0, totalLen);
- out.add(returnBuffer);
- }
-}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/EventLoopUtil.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/EventLoopUtil.java
deleted file mode 100644
index 80e400338f..0000000000
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/EventLoopUtil.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.util;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollChannelOption;
-import io.netty.channel.epoll.EpollDatagramChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollMode;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.ServerSocketChannel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioDatagramChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.Future;
-
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadFactory;
-
-public class EventLoopUtil {
-
- public EventLoopUtil() {
- }
-
- public static EventLoopGroup newEventLoopGroup(int nThreads, boolean
enableBusyWait, ThreadFactory threadFactory) {
- if (!Epoll.isAvailable()) {
- return new NioEventLoopGroup(nThreads, threadFactory);
- } else if (!enableBusyWait) {
- return new EpollEventLoopGroup(nThreads, threadFactory);
- } else {
- EpollEventLoopGroup eventLoopGroup = new
EpollEventLoopGroup(nThreads, threadFactory, () -> {
- return (selectSupplier, hasTasks) -> {
- return -3;
- };
- });
- return eventLoopGroup;
- }
- }
-
- public static Class<? extends SocketChannel>
getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
- return eventLoopGroup instanceof EpollEventLoopGroup
- ? EpollSocketChannel.class
- : NioSocketChannel.class;
- }
-
- public static Class<? extends ServerSocketChannel>
getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
- return eventLoopGroup instanceof EpollEventLoopGroup
- ? EpollServerSocketChannel.class
- : NioServerSocketChannel.class;
- }
-
- public static Class<? extends DatagramChannel>
getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
- return eventLoopGroup instanceof EpollEventLoopGroup
- ? EpollDatagramChannel.class
- : NioDatagramChannel.class;
- }
-
- public static void enableTriggeredMode(ServerBootstrap bootstrap) {
- if (Epoll.isAvailable()) {
- bootstrap.childOption(EpollChannelOption.EPOLL_MODE,
EpollMode.LEVEL_TRIGGERED);
- }
-
- }
-
- public static CompletableFuture<Void> shutdownGracefully(EventLoopGroup
eventLoopGroup) {
- return toCompletableFutureVoid(eventLoopGroup.shutdownGracefully());
- }
-
- /**
- * get CompletableFuture by Future
- *
- * @param future Future
- * @return CompletableFuture
- */
- public static CompletableFuture<Void> toCompletableFutureVoid(Future<?>
future) {
- Objects.requireNonNull(future, "future cannot be null");
-
- CompletableFuture<Void> adapter = new CompletableFuture<>();
- if (future.isDone()) {
- if (future.isSuccess()) {
- adapter.complete(null);
- } else {
- adapter.completeExceptionally(future.cause());
- }
- } else {
- future.addListener(f -> {
- if (f.isSuccess()) {
- adapter.complete(null);
- } else {
- adapter.completeExceptionally(f.cause());
- }
- });
- }
- return adapter;
- }
-}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/RequestIdUtils.java
similarity index 59%
rename from
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
rename to
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/RequestIdUtils.java
index 99632002ea..8575e9afca 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/SenderResult.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/RequestIdUtils.java
@@ -17,33 +17,22 @@
package org.apache.inlong.audit.util;
-import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
-public class SenderResult {
+public class RequestIdUtils {
- public final InetSocketAddress addr;
- public boolean result;
+ private static final Long MAX_REQUEST_ID = 1000000000L;
+ private static final AtomicLong requestIdSeq = new AtomicLong(0L);
/**
- * Constructor
- *
- * @param addr
- * @param result
+ * Next request id
*/
- public SenderResult(InetSocketAddress addr, boolean result) {
- this.addr = addr;
- this.result = result;
- }
-
- /**
- * Constructor
- *
- * @param sendIp
- * @param sendPort
- * @param result
- */
- public SenderResult(String sendIp, int sendPort, boolean result) {
- this.addr = new InetSocketAddress(sendIp, sendPort);
- this.result = result;
+ public static Long nextRequestId() {
+ long requestId = requestIdSeq.getAndIncrement();
+ if (requestId > MAX_REQUEST_ID) {
+ requestId = 0L;
+ requestIdSeq.set(requestId);
+ }
+ return requestId;
}
}
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/ProxyManagerTest.java
similarity index 52%
copy from
inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
copy to
inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/ProxyManagerTest.java
index da936e188e..4fdad0f0c3 100644
---
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
+++
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/ProxyManagerTest.java
@@ -17,34 +17,28 @@
package org.apache.inlong.audit.send;
-import org.apache.inlong.audit.util.AuditConfig;
-
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-
-public class SenderManagerTest {
-
- private AuditConfig testConfig = new AuditConfig();
+import java.net.InetSocketAddress;
+import java.util.HashSet;
- @Test
- public void nextRequestId() {
- SenderManager testManager = new SenderManager(testConfig);
- Long requestId = testManager.nextRequestId();
- assertTrue(requestId == 0);
-
- requestId = testManager.nextRequestId();
- assertTrue(requestId == 1);
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
- requestId = testManager.nextRequestId();
- assertTrue(requestId == 2);
- }
+public class ProxyManagerTest {
@Test
- public void clearBuffer() {
- SenderManager testManager = new SenderManager(testConfig);
- testManager.clearBuffer();
- int dataMapSize = testManager.getDataMapSize();
- assertTrue(dataMapSize == 0);
+ public void testProxyManager() {
+ HashSet<String> ipPortList = new HashSet<>();
+ ipPortList.add("172.0.0.1:10081");
+ ipPortList.add("172.0.0.2:10081");
+ ipPortList.add("172.0.0.3:10081");
+ ipPortList.add("172.0.0.4:10081");
+ ipPortList.add("172.0.0.5:10081");
+ ProxyManager.getInstance().setAuditProxy(ipPortList);
+ InetSocketAddress inetSocketAddress =
ProxyManager.getInstance().getInetSocketAddress();
+ assertEquals(10081, inetSocketAddress.getPort());
+
assertTrue(inetSocketAddress.getAddress().getHostAddress().startsWith("172.0.0.")
+ && inetSocketAddress.getAddress().getHostAddress().length() ==
9);
}
-}
\ No newline at end of file
+}
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
deleted file mode 100644
index 19d73b7476..0000000000
---
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.audit.send;
-
-import org.apache.inlong.audit.util.AuditConfig;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SenderGroupTest {
-
- AuditConfig testConfig = new AuditConfig();
- SenderManager testManager = new SenderManager(testConfig);
- SenderHandler clientHandler = new
org.apache.inlong.audit.send.SenderHandler(testManager);
- SenderGroup sender = new
org.apache.inlong.audit.send.SenderGroup(testManager);
-
- @Test
- public void isHasSendError() {
- sender.setHasSendError(false);
- boolean isError = sender.isHasSendError();
- assertFalse(isError);
- sender.setHasSendError(true);
- isError = sender.isHasSendError();
- assertTrue(isError);
- }
-
- @Test
- public void setHasSendError() {
- sender.setHasSendError(false);
- boolean isError = sender.isHasSendError();
- assertFalse(isError);
- sender.setHasSendError(true);
- isError = sender.isHasSendError();
- assertTrue(isError);
- }
-}
\ No newline at end of file
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
index da936e188e..ea53645b4b 100644
---
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
+++
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderManagerTest.java
@@ -27,23 +27,10 @@ public class SenderManagerTest {
private AuditConfig testConfig = new AuditConfig();
- @Test
- public void nextRequestId() {
- SenderManager testManager = new SenderManager(testConfig);
- Long requestId = testManager.nextRequestId();
- assertTrue(requestId == 0);
-
- requestId = testManager.nextRequestId();
- assertTrue(requestId == 1);
-
- requestId = testManager.nextRequestId();
- assertTrue(requestId == 2);
- }
-
@Test
public void clearBuffer() {
SenderManager testManager = new SenderManager(testConfig);
- testManager.clearBuffer();
+ testManager.checkFailedData();
int dataMapSize = testManager.getDataMapSize();
assertTrue(dataMapSize == 0);
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/RequestIdUtilsTest.java
similarity index 56%
rename from
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
rename to
inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/RequestIdUtilsTest.java
index 233677ef59..1a10ef95db 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/RequestIdUtilsTest.java
@@ -15,24 +15,24 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.send;
+package org.apache.inlong.audit.util;
-import org.apache.inlong.audit.util.Decoder;
+import org.junit.Test;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+import static org.junit.Assert.assertTrue;
-public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {
+public class RequestIdUtilsTest {
- private SenderManager senderManager;
+ @Test
+ public void testNextRequestId() {
- public ClientPipelineFactory(SenderManager senderManager) {
- this.senderManager = senderManager;
- }
+ Long requestId = RequestIdUtils.nextRequestId();
+ assertTrue(requestId == 0);
+
+ requestId = RequestIdUtils.nextRequestId();
+ assertTrue(requestId == 1);
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast("contentDecoder", new Decoder());
- ch.pipeline().addLast("handler", new SenderHandler(senderManager));
+ requestId = RequestIdUtils.nextRequestId();
+ assertTrue(requestId == 2);
}
}